Skip to content

Commit

Permalink
[FLINK-10145] [table] Add replace function in Table API and SQL
Browse files Browse the repository at this point in the history
This closes apache#6576.
  • Loading branch information
Guibo-Pan authored and zhangxinyu committed Sep 27, 2018
1 parent c0c80fe commit 13aca23
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 0 deletions.
36 changes: 36 additions & 0 deletions docs/dev/table/functions.md
Expand Up @@ -2448,6 +2448,18 @@ SUBSTRING(string FROM integer1 [ FOR integer2 ])
</td>
</tr>

<tr>
<td>
{% highlight text %}
REPLACE(string1, string2, string3)
{% endhighlight %}
</td>
<td>
<p>Returns a new string which replaces all the occurrences of <i>string2</i> with <i>string3</i> (non-overlapping) from <i>string1</i></p>
<p>E.g., <code>REPLACE("hello world", "world", "flink")</code> returns "hello flink"; <code>REPLACE("ababab", "abab", "z")</code> returns "zab".</p>
</td>
</tr>

<tr>
<td>
{% highlight text %}
Expand Down Expand Up @@ -2688,6 +2700,18 @@ STRING.substring(INT1, INT2)
</td>
</tr>

<tr>
<td>
{% highlight java %}
STRING1.replace(STRING2, STRING3)
{% endhighlight %}
</td>
<td>
<p>Returns a new string which replaces all the occurrences of <i>STRING2</i> with <i>STRING3</i> (non-overlapping) from <i>STRING1</i>.</p>
<p>E.g., <code>'hello world'.replace('world', 'flink')</code> returns 'hello flink'; <code>'ababab'.replace('abab', 'z')</code> returns 'zab'.</p>
</td>
</tr>

<tr>
<td>
{% highlight java %}
Expand Down Expand Up @@ -2927,6 +2951,18 @@ STRING.substring(INT1, INT2)
</td>
</tr>

<tr>
<td>
{% highlight scala %}
STRING1.replace(STRING2, STRING3)
{% endhighlight %}
</td>
<td>
<p>Returns a new string which replaces all the occurrences of <i>STRING2</i> with <i>STRING3</i> (non-overlapping) from <i>STRING1</i>.</p>
<p>E.g., <code>"hello world".replace("world", "flink")</code> returns "hello flink"; <code>"ababab".replace("abab", "z")</code> returns "zab".</p>
</td>
</tr>

<tr>
<td>
{% highlight scala %}
Expand Down
Expand Up @@ -471,6 +471,13 @@ trait ImplicitExpressionOperations {
}
}

/**
* Returns a new string which replaces all the occurrences of the search target
* with the replacement string (non-overlapping).
*/
def replace(search: Expression, replacement: Expression) =
Replace(expr, search, replacement)

/**
* Returns the length of a string.
*/
Expand Down
Expand Up @@ -153,6 +153,12 @@ object FunctionGenerator {
STRING_TYPE_INFO,
BuiltInMethods.REGEXP_REPLACE)

addSqlFunctionMethod(
REPLACE,
Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO),
STRING_TYPE_INFO,
BuiltInMethod.REPLACE.method)

addSqlFunctionMethod(
FROM_BASE64,
Seq(STRING_TYPE_INFO),
Expand Down
Expand Up @@ -511,3 +511,27 @@ case class Repeat(str: Expression, n: Expression) extends Expression with InputT

override def toString: String = s"($str).repeat($n)"
}

/**
* Returns a new string which replaces all the occurrences of the search target
* with the replacement string (non-overlapping).
*/
case class Replace(str: Expression,
search: Expression,
replacement: Expression) extends Expression with InputTypeSpec {

def this(str: Expression, begin: Expression) = this(str, begin, CharLength(str))

override private[flink] def children: Seq[Expression] = str :: search :: replacement :: Nil

override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO

override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO)

override def toString: String = s"($str).replace($search, $replacement)"

override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
relBuilder.call(SqlStdOperatorTable.REPLACE, children.map(_.toRexNode))
}
}
Expand Up @@ -193,6 +193,7 @@ object FunctionCatalog {
"lowerCase" -> classOf[Lower],
"similar" -> classOf[Similar],
"substring" -> classOf[Substring],
"replace" -> classOf[Replace],
"trim" -> classOf[Trim],
"upper" -> classOf[Upper],
"upperCase" -> classOf[Upper],
Expand Down Expand Up @@ -444,6 +445,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
SqlStdOperatorTable.RAND_INTEGER,
ScalarSqlFunctions.CONCAT,
ScalarSqlFunctions.CONCAT_WS,
SqlStdOperatorTable.REPLACE,
ScalarSqlFunctions.BIN,
ScalarSqlFunctions.HEX,
SqlStdOperatorTable.TIMESTAMP_ADD,
Expand Down
Expand Up @@ -94,6 +94,39 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
"his is a test String.")
}

@Test
def testReplace(): Unit = {
testAllApis(
'f0.replace(" ", "_"),
"f0.replace(' ', '_')",
"REPLACE(f0, ' ', '_')",
"This_is_a_test_String.")

testAllApis(
'f0.replace("i", ""),
"f0.replace('i', '')",
"REPLACE(f0, 'i', '')",
"Ths s a test Strng.")

testAllApis(
'f33.replace("i", ""),
"f33.replace('i', '')",
"REPLACE(f33, 'i', '')",
"null")

testAllApis(
'f0.replace(Null(Types.STRING), ""),
"f0.replace(Null(STRING), '')",
"REPLACE(f0, NULLIF('', ''), '')",
"null")

testAllApis(
'f0.replace(" ", Null(Types.STRING)),
"f0.replace(' ', Null(STRING))",
"REPLACE(f0, ' ', NULLIF('', ''))",
"null")
}

@Test
def testTrim(): Unit = {
testAllApis(
Expand Down
Expand Up @@ -155,6 +155,7 @@ class SqlExpressionTest extends ExpressionTestBase {
"REPEAT('This is a test String.', 2)",
"This is a test String.This is a test String.")
testSqlApi("REGEXP_REPLACE('foobar', 'oo|ar', '')", "fb")
testSqlApi("REPLACE('hello world', 'world', 'flink')", "hello flink")
}

@Test
Expand Down

0 comments on commit 13aca23

Please sign in to comment.