In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Row

In [2]:
import pandas as pd
import numpy as np

In [3]:
conf = SparkConf()
conf.setMaster('spark://spark:7077')
conf.set('spark.authenticate', False)
sc = SparkContext(conf=conf)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/08 07:12:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# define a Python data structure
big_list = range(1000000)

In [5]:
# Distribute a local Python data structure to form an RDD. Using range is recommended if the input represents a range for performance.
# Distribute that data into 2 partitions
rdd = sc.parallelize(big_list, 2)

In [6]:
# Return a new RDD containing only the elements that satisfy a predicate (odd numbers).
odds = rdd.filter(lambda x: x % 2 != 0)

In [7]:
# Take the first 10 elements of the RDD.
# It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.
# This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.
odds.take(10)

                                                                                

[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]

In [8]:
# Persist this RDD with the default storage level (MEMORY_ONLY).
# Take the first 10 elements of the cached RDD.
odds.cache().take(10)

                                                                                

[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]

In [9]:
# Generate some random numbers
df = pd.DataFrame(np.random.randint(0, 999, size=(1000000, 2)), columns=list('AB'))

In [10]:
df.to_csv('test.csv', index=False)

In [11]:
df

Unnamed: 0,A,B
0,505,309
1,227,229
2,554,669
3,353,84
4,100,495
...,...,...
999995,710,783
999996,720,326
999997,759,293
999998,702,311


In [12]:
spark = SparkSession.builder.getOrCreate()

In [13]:
spark

In [14]:
df_pyspark = spark.read.csv('test.csv', header=True, inferSchema=True)

                                                                                

In [15]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [16]:
df_pyspark.show()

+---+---+
|  A|  B|
+---+---+
|505|309|
|227|229|
|554|669|
|353| 84|
|100|495|
|838|783|
|812|302|
| 14|573|
|638|418|
|997|941|
|219|526|
|299| 75|
|480| 51|
|795|713|
|658|179|
|856|896|
|530|409|
|387|723|
|375|819|
|219|913|
+---+---+
only showing top 20 rows



In [17]:
df_pyspark.head(15)

[Row(A=505, B=309),
 Row(A=227, B=229),
 Row(A=554, B=669),
 Row(A=353, B=84),
 Row(A=100, B=495),
 Row(A=838, B=783),
 Row(A=812, B=302),
 Row(A=14, B=573),
 Row(A=638, B=418),
 Row(A=997, B=941),
 Row(A=219, B=526),
 Row(A=299, B=75),
 Row(A=480, B=51),
 Row(A=795, B=713),
 Row(A=658, B=179)]

In [23]:
df_pyspark.printSchema()

root
 |-- A: integer (nullable = true)
 |-- B: integer (nullable = true)



In [24]:
df_pyspark.select('A').show()

+---+
|  A|
+---+
|970|
|617|
|  5|
|796|
|984|
|189|
| 82|
|818|
|750|
|591|
|513|
|790|
|958|
|678|
|521|
|678|
|372|
|372|
|590|
| 17|
+---+
only showing top 20 rows



In [25]:
df_pyspark.filter("a<4").show()

+---+---+
|  A|  B|
+---+---+
|  2|157|
|  3|465|
|  1| 82|
|  3|729|
|  2|574|
|  2|621|
|  0|563|
|  2|948|
|  0|335|
|  2| 47|
|  1|614|
|  1|836|
|  2|971|
|  1|511|
|  2|921|
|  3|648|
|  1|409|
|  0|104|
|  1|800|
|  1|374|
+---+---+
only showing top 20 rows



In [26]:
# Register the DataFrame as a global temporary view
df_pyspark.createGlobalTempView("test")

In [27]:
spark.sql("SELECT * FROM global_temp.test WHERE A<4").show()

+---+---+
|  A|  B|
+---+---+
|  2|157|
|  3|465|
|  1| 82|
|  3|729|
|  2|574|
|  2|621|
|  0|563|
|  2|948|
|  0|335|
|  2| 47|
|  1|614|
|  1|836|
|  2|971|
|  1|511|
|  2|921|
|  3|648|
|  1|409|
|  0|104|
|  1|800|
|  1|374|
+---+---+
only showing top 20 rows



In [28]:
spark.sql("SELECT COUNT (A) FROM global_temp.test WHERE A<4").show()



+--------+
|count(A)|
+--------+
|    4068|
+--------+



                                                                                

In [29]:
rdd = spark.sql("SELECT * FROM global_temp.test WHERE A<4").rdd

In [32]:
new_rdd = rdd.map(lambda w: Row(A=w[0], B=w[1]+10000))

In [33]:
new_df = spark.createDataFrame(new_rdd)

                                                                                

In [34]:
new_df.show()

[Stage 15:>                                                         (0 + 1) / 1]

+---+-----+
|  A|    B|
+---+-----+
|  2|10157|
|  3|10465|
|  1|10082|
|  3|10729|
|  2|10574|
|  2|10621|
|  0|10563|
|  2|10948|
|  0|10335|
|  2|10047|
|  1|10614|
|  1|10836|
|  2|10971|
|  1|10511|
|  2|10921|
|  3|10648|
|  1|10409|
|  0|10104|
|  1|10800|
|  1|10374|
+---+-----+
only showing top 20 rows



                                                                                

In [39]:
digits = new_rdd.map(lambda w: Row(A=w[0], B=','.join([str(i) for i in str(w[1])])))

In [40]:
digits_df = spark.createDataFrame(digits)

                                                                                

In [41]:
digits_df.show()

[Stage 20:>                                                         (0 + 1) / 1]

+---+---------+
|  A|        B|
+---+---------+
|  2|1,0,1,5,7|
|  3|1,0,4,6,5|
|  1|1,0,0,8,2|
|  3|1,0,7,2,9|
|  2|1,0,5,7,4|
|  2|1,0,6,2,1|
|  0|1,0,5,6,3|
|  2|1,0,9,4,8|
|  0|1,0,3,3,5|
|  2|1,0,0,4,7|
|  1|1,0,6,1,4|
|  1|1,0,8,3,6|
|  2|1,0,9,7,1|
|  1|1,0,5,1,1|
|  2|1,0,9,2,1|
|  3|1,0,6,4,8|
|  1|1,0,4,0,9|
|  0|1,0,1,0,4|
|  1|1,0,8,0,0|
|  1|1,0,3,7,4|
+---+---------+
only showing top 20 rows



                                                                                

In [46]:
# Run MapReduce
counts = digits_df.select('B').rdd.flatMap(lambda line: line.B.split(",")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

In [47]:
counts_df = spark.createDataFrame(counts)

                                                                                

In [49]:
# the number of occurences for each number 0-9
counts_df.show()

+---+----+
| _1|  _2|
+---+----+
|  1|5283|
|  0|5303|
|  4|1262|
|  8|1238|
|  9|1218|
|  5|1209|
|  7|1200|
|  6|1194|
|  2|1208|
|  3|1225|
+---+----+



In [34]:
df_pyspark.toPandas().to_dict('list')

                                                                                

{'A': [505,
  227,
  554,
  353,
  100,
  838,
  812,
  14,
  638,
  997,
  219,
  299,
  480,
  795,
  658,
  856,
  530,
  387,
  375,
  219,
  622,
  23,
  347,
  921,
  667,
  468,
  120,
  977,
  111,
  920,
  339,
  716,
  962,
  833,
  211,
  269,
  743,
  425,
  850,
  823,
  270,
  833,
  24,
  135,
  414,
  269,
  764,
  605,
  179,
  429,
  896,
  870,
  781,
  442,
  443,
  809,
  493,
  111,
  550,
  59,
  742,
  73,
  394,
  70,
  263,
  451,
  617,
  655,
  380,
  220,
  620,
  636,
  402,
  831,
  754,
  64,
  435,
  318,
  772,
  164,
  116,
  165,
  74,
  408,
  960,
  231,
  745,
  292,
  419,
  94,
  679,
  699,
  721,
  828,
  525,
  95,
  457,
  528,
  632,
  156,
  384,
  923,
  389,
  798,
  397,
  768,
  818,
  829,
  278,
  890,
  479,
  141,
  254,
  833,
  806,
  691,
  68,
  473,
  382,
  187,
  954,
  435,
  401,
  621,
  964,
  364,
  558,
  685,
  180,
  79,
  897,
  907,
  724,
  250,
  559,
  838,
  501,
  298,
  892,
  496,
  771,
  622,
  8,
  739,
 