In [1]:
import findspark
findspark.init()

In [2]:
import pyspark

In [3]:
sc = pyspark.SparkContext(appName='Data Replication')

In [4]:
rdd_1 = sc.textFile("hdfs:///user/root/01_multiply_data_input.tsv").map(lambda x: x.split("\t"))
rdd_1.collect()

[['a', 'a', '1'], ['a', 'a', '2'], ['a', 'b', '1']]

In [5]:
rdd_2 = sc.parallelize([str(i+1) for i in range(2)])
rdd_2.collect()

['1', '2']

In [6]:
def f(x):
    l = []
    l.append(x[0][0])
    l.append(x[0][1] + "_" + x[1])
    l.append(x[0][2])
    return l

In [7]:
rdd_3 = rdd_1.cartesian(rdd_2).map(lambda x: f(x))
rdd_3.collect()

[['a', 'a_1', '1'],
 ['a', 'a_1', '2'],
 ['a', 'a_2', '1'],
 ['a', 'a_2', '2'],
 ['a', 'b_1', '1'],
 ['a', 'b_2', '1']]

In [8]:
def g(x):
    return "\t".join(x[i] for i in range(len(x)))

In [9]:
rdd_4 = rdd_3.map(lambda x: g(x))
rdd_4.collect()

['a\ta_1\t1', 'a\ta_1\t2', 'a\ta_2\t1', 'a\ta_2\t2', 'a\tb_1\t1', 'a\tb_2\t1']

In [10]:
hasattr(rdd_1, "toDF")

False

In [11]:
ss = pyspark.sql.SparkSession(sc)

In [12]:
hasattr(rdd_1, "toDF")

True

In [13]:
rdd_1.map(lambda x: (x[0], x[1], x[2])).collect()

[('a', 'a', '1'), ('a', 'a', '2'), ('a', 'b', '1')]

In [14]:
df_1 = rdd_1.map(lambda x: (x[0], x[1], x[2])).toDF(["col_1", "col_2", "col_3"])
df_1.show()

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|    a|    1|
|    a|    a|    2|
|    a|    b|    1|
+-----+-----+-----+



In [15]:
df_2 = rdd_3.map(lambda x: (x[0], x[1], x[2])).toDF(["col_1", "col_2", "col_3"])
df_2.show()

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  a_1|    1|
|    a|  a_1|    2|
|    a|  a_2|    1|
|    a|  a_2|    2|
|    a|  b_1|    1|
|    a|  b_2|    1|
+-----+-----+-----+



In [16]:
!hdfs dfs -rmr /user/root/01_multiply_data_output

rmr: DEPRECATED: Please use '-rm -r' instead.
Deleted /user/root/01_multiply_data_output


In [17]:
rdd_4.saveAsTextFile("hdfs:///user/root/01_multiply_data_output")

In [18]:
!hdfs dfs -ls /user/root/01_multiply_data_output

Found 5 items
-rw-r--r--   1 root supergroup          0 2019-11-09 16:58 /user/root/01_multiply_data_output/_SUCCESS
-rw-r--r--   1 root supergroup         16 2019-11-09 16:58 /user/root/01_multiply_data_output/part-00000
-rw-r--r--   1 root supergroup         16 2019-11-09 16:58 /user/root/01_multiply_data_output/part-00001
-rw-r--r--   1 root supergroup          8 2019-11-09 16:58 /user/root/01_multiply_data_output/part-00002
-rw-r--r--   1 root supergroup          8 2019-11-09 16:58 /user/root/01_multiply_data_output/part-00003


In [19]:
!hdfs dfs -cat /user/root/01_multiply_data_output/part-0000[0-3]

a	a_1	1
a	a_1	2
a	a_2	1
a	a_2	2
a	b_1	1
a	b_2	1


In [20]:
ss.stop()

In [21]:
sc.stop()