#### PySpark Tutorial

In [None]:
#docker command
docker run -it \
  -v ./work:/home/jovyan/work \
  --user root \
  -e CHOWN_HOME=yes \
  -e CHOWN_HOME_OPTS='-R' \
  -p 8888:8888 \
  jupyter/pyspark-notebook

In [1]:
import pyspark

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS']

'--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 pyspark-shell'

In [3]:
spark = pyspark.sql.SparkSession.builder.appName("SparkPlayground").getOrCreate()
print(spark.version)

3.5.0


In [4]:
rdd = spark.sparkContext.parallelize(range(1000))
print("Count: ", rdd.count())

Count:  1000


In [10]:
# Create RDD from parallelize    
dataList = [("Java", 20000), ("Python", 100000), ("Scala", 3000), ("Python", 30000), ("C++", 100000), ("Java", 50000)]
rdd=spark.sparkContext.parallelize(dataList)

In [11]:
# Examples of RDD operations
print("RDD operations")
print("Count: ", rdd.count())
print("First: ", rdd.first())
print("Take: ", rdd.take(2))
print("Collect: ", rdd.collect())

RDD operations
Count:  6
First:  ('Java', 20000)
Take:  [('Java', 20000), ('Python', 100000)]
Collect:  [('Java', 20000), ('Python', 100000), ('Scala', 3000), ('Python', 30000), ('C++', 100000), ('Java', 50000)]


In [12]:
# Examples of RDD transformations
print("RDD transformations")
print("Map: ", rdd.map(lambda x: (x[0], x[1] * 2)).collect())
print("Filter: ", rdd.filter(lambda x: "P" in x[0]).collect())
print("FlatMap: ", rdd.flatMap(lambda x: (x[0], x[1] * 2)).collect())
print("GroupByKey: ", rdd.groupByKey().mapValues(list).collect())
print("ReduceByKey: ", rdd.reduceByKey(lambda x, y: x + y).collect())
print("SortByKey: ", rdd.sortByKey().collect())

RDD transformations
Map:  [('Java', 40000), ('Python', 200000), ('Scala', 6000), ('Python', 60000), ('C++', 200000), ('Java', 100000)]
Filter:  [('Python', 100000), ('Python', 30000)]
FlatMap:  ['Java', 40000, 'Python', 200000, 'Scala', 6000, 'Python', 60000, 'C++', 200000, 'Java', 100000]
GroupByKey:  [('Scala', [3000]), ('C++', [100000]), ('Java', [20000, 50000]), ('Python', [100000, 30000])]
ReduceByKey:  [('Scala', 3000), ('C++', 100000), ('Java', 70000), ('Python', 130000)]
SortByKey:  [('C++', 100000), ('Java', 20000), ('Java', 50000), ('Python', 100000), ('Python', 30000), ('Scala', 3000)]


In [13]:
# Dataframe
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [16]:
# Show first elements
print("Show first elements:")
print(df.show(5))

Show first elements:
+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+

None


In [17]:
# DF operations
print("DF operations")
print("Count: ", df.count())
print("First: ", df.first())
print("Take: ", df.take(2))
print("Collect: ", df.collect())

DF operations
Count:  5
First:  Row(firstname='James', middlename='', lastname='Smith', dob='1991-04-01', gender='M', salary=3000)
Take:  [Row(firstname='James', middlename='', lastname='Smith', dob='1991-04-01', gender='M', salary=3000), Row(firstname='Michael', middlename='Rose', lastname='', dob='2000-05-19', gender='M', salary=4000)]
Collect:  [Row(firstname='James', middlename='', lastname='Smith', dob='1991-04-01', gender='M', salary=3000), Row(firstname='Michael', middlename='Rose', lastname='', dob='2000-05-19', gender='M', salary=4000), Row(firstname='Robert', middlename='', lastname='Williams', dob='1978-09-05', gender='M', salary=4000), Row(firstname='Maria', middlename='Anne', lastname='Jones', dob='1967-12-01', gender='F', salary=4000), Row(firstname='Jen', middlename='Mary', lastname='Brown', dob='1980-02-17', gender='F', salary=-1)]


In [19]:
# PySpark SQL simple select
df.createOrReplaceTempView("people")
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [22]:
# PySpark SQL M/F stats
groupDF = spark.sql("SELECT gender, count(*) from people group by gender")
groupDF.show()

+------+--------+
|gender|count(1)|
+------+--------+
|     M|       3|
|     F|       2|
+------+--------+



In [5]:
# Reading from kafka topic
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "pyspark").load()

In [11]:
# # Writing to kafka topic the same message with its index
# df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
#     .writeStream \
#     .format("kafka") \
#     .outputMode("append") \
#     .option("kafka.bootstrap.servers", "localhost:9092") \
#     .option("topic", "pyspark-write") \
#     .option("checkpointLocation", "/tmp/kafka") \
#     .start() \
#     .awaitTermination()