Topics Covered:
    1. Creating Spark SQL Session
    2. Creating Spark Context
    3. Using RDD, map and take
    4. Reading text file and applying actions
    5. Examples: Transformations
    6. Connecting to MySQL
    7. Read and Write: CSV
    8. Read and Write: Parquet
    9. SQL on dataframes.
    10. DataFrame operations

# 00. Getting Started

In [2]:
!pip install -q findspark

In [3]:
import findspark
findspark.init()

# 01. Creating a Spark SQL Session

In [5]:
#from pyspark.sql import SparkSession
#spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
#spark.stop()

# 02. Creating a Spark Context

In [4]:
#import pyspark
from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()
conf.setMaster('local')
conf.setAppName('spark-basic')

<pyspark.conf.SparkConf at 0x14ea545d748>

In [5]:
sc = SparkContext("local", "spark-basic")
#sc = SparkContext(conf=conf)

# 03. Using RDD, map and take

In [5]:
rdd = sc.parallelize(range(1000)).map(lambda x:(x,x%2)).take(10)
print(rdd)

[(0, 0), (1, 1), (2, 0), (3, 1), (4, 0), (5, 1), (6, 0), (7, 1), (8, 0), (9, 1)]


# 04. Reading text file and applying actions

Reading TXT File and use pre-defined functions:
count, collect,first,take,takeSample,

In [6]:
# Reading a text file:
txt = sc.textFile('hello.txt')
type(txt)

pyspark.rdd.RDD

In [7]:
txt.collect() # rdd got converted to list format.
print("first(): ",txt.first())
print("count(): ",txt.count())
print("take(): ",txt.take(5))
print("takeSample(): ",txt.takeSample(False,5,1)) # takes 5 samples with or without(True or False) replacement.


first():  Line 1
count():  1244
take():  ['Line 1', 'Line 2', 'Line 3', 'Line 4', 'Line 5']
takeSample():  ['Last Line it is.', 'This is line number two', 'Welcome to PySpark!!', 'Last Line it is.', 'Line 3']


# 05. Examples: Transformations

transformations: map,flastmap,filter,mapPartitions,mapPartitionsWithIndex,sample,union,intersection,distinct groupBy,keyBy,Zip,zipwithIndex,Coalesce,Repartition,sortBy

In [8]:
#Using map:
x = sc.parallelize(["Red","Green","Blue","Yellow"])
y = x.map(lambda x:(x+str(2)))
y.collect()

['Red2', 'Green2', 'Blue2', 'Yellow2']

In [9]:
#Using flatMap:
x = sc.parallelize([2,3,4])
y = x.flatMap(lambda x:range(1,x)).collect()
print(y)

[1, 1, 2, 1, 2, 3]


In [10]:
# Using Filter:
x = sc.parallelize([2,5,6,8,1,5,8,9,6,3])
y = x.filter(lambda x : x%2==0).collect()
y

[2, 6, 8, 8, 6]

In [11]:
#Take one sample without replacement from each cluster:
x = sc.parallelize(range(1,10))
print(x.sample(False,0.8,2).collect()) # (with or without replacement,fraction,seed)
print(x.sample(False,1,2).collect()) 

[2, 5, 6, 8]
[1, 2, 3, 4, 5, 6, 7, 8, 9]


In [12]:
#Union&intersection:
x = sc.parallelize(range(1,9))
y = sc.parallelize(range(5,15))
z = x.union(y).collect()
z2 = x.intersection(y).collect()
print("Union: ",z," Intersection: ",z2)

Union:  [1, 2, 3, 4, 5, 6, 7, 8, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]  Intersection:  [6, 8, 5, 7]


In [13]:
#sortBy:
#x = sc.parallelize([2,8,9,5,4,7,1,1,2,3])
#x.sortBy(lambda x:x,True).collect()
y = sc.parallelize([("F",222),("Z",28),("I",15),("D",10)])
y.sortBy(lambda x:x,True).collect()

[('D', 10), ('F', 222), ('I', 15), ('Z', 28)]

In [14]:
#mapPartitios:
x = sc.parallelize([1,2,3,4,5,6],2)
def f(cluster): yield sum(cluster)
x.mapPartitions(f).collect()

[6, 15]

In [15]:
#mapPartitios with index:
x = sc.parallelize([1,2,3,4,5,6],5)
def f(index,cluster): yield index
x.mapPartitionsWithIndex(f).sum()

10

In [16]:
# groupBy:
rdd = sc.parallelize([1,1,2,3,5,8])
result = rdd.groupBy(lambda x: x%2).collect()
sorted([(x,sorted(y)) for (x,y) in result])

[(0, [2, 8]), (1, [1, 1, 3, 5])]

In [17]:
#keyBy:
x = sc.parallelize(range(0,3)).keyBy(lambda x:x*x)
y = sc.parallelize(zip(range(0,5), range(0,5)))
[(x, list(map(list,y))) for x,y in sorted(x.cogroup(y).collect())]

[(0, [[0], [0]]),
 (1, [[1], [1]]),
 (2, [[], [2]]),
 (3, [[], [3]]),
 (4, [[2], [4]])]

In [18]:
#zip:
x = sc.parallelize(range(0,5))
y = sc.parallelize(range(1000,1005))
x.zip(y).collect()

[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]

In [19]:
#zip:
sc.parallelize(["a","b","c","d"]).zipWithIndex().collect()

[('a', 0), ('b', 1), ('c', 2), ('d', 3)]

In [20]:
#partitioning:
rdd = sc.parallelize([1,2,3,4,5,6,7],4)
rdd.glom().collect()

[[1], [2, 3], [4, 5], [6, 7]]

In [21]:
rdd.repartition(2).glom().collect()

[[1, 4, 5, 6, 7], [2, 3]]

In [22]:
#Coalesce:
sc.parallelize([1,2,3,4,5],3).glom().collect()

[[1], [2, 3], [4, 5]]

In [23]:
sc.parallelize([1,2,3,4,5],3).coalesce(2).glom().collect()

[[1], [2, 3, 4, 5]]

# 06. Connecting to MySQL

In [36]:
database = "sonuresodb"
table = "lyrics"
user = "root"
password  = ""

jdbcDF = spark.read.format("jdbc").option("url", f"jdbc:sqlserver://localhost:3306;databaseName={database};") \
    .option("dbtable", table) \
    .option("user", user) \
    .option("password", password).option("driver", "com.mysql.jdbc.Driver") \
    .load()

# 07. Read and Write: CSV

1. Reading csv with SQL spark: spark.read.csv(filename)
    1. Reading one csv file:                  spark.read.csv("file1.csv",header=True)
    2. Reading multiple csv file:             spark.read.csv("file1.csv,file2.csv,file3.csv",header=True)
    3. Reading all csv file in a directory:   spark.read.csv("folder path",header=True)
    4. options: a) ('delimiter'=',') or ('sep'=',')
                b) ("inferSchema",True)
                c) ("header",True)
                d) New Schema: spark.read.csv("file.csv",schema=new_schema)
2. Writing dataframes as csv:  df.write.csv(foldername)
    1. options: a) header=True :        df.write.csv("foldernm/",header=True)
                b) delimiter='|' :      df.write.options(delimiter='|',header=True).csv("newmovies/")
    2. mode:    a) overwrite :          df.write.mode('overwrite').csv("newmovies/")
                b) append      c) ignore       d) error



In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [7]:
# method_1.1 to read a csv:
df=spark.read.option("header",True).option("delimiter",",").option("inferSchema",True).csv("movies.csv")

In [27]:
# method_1.2 (short method) to read a csv:
df=spark.read.csv("movies.csv",header=True,sep=",",inferSchema=True)

In [9]:
# method_2.1 to read a csv with new_schema:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType
new_schema = StructType() \
      .add("year",IntegerType(),True) \
      .add("imdb",StringType(),True) \
      .add("title",StringType(),True) \
      .add("test",StringType(),True) \
      .add("clean_test",StringType(),True) \
      .add("binary",StringType(),True) \
      .add("budget",DoubleType(),True) \
      .add("domgross",StringType(),True) \
      .add("intgross",StringType(),True) \
      .add("code",StringType(),True) \
      .add("budget_2013$",DoubleType(),True) \
      .add("domgross_2013$",StringType(),True) \
      .add("intgross_2013$",StringType(),True) \
      .add("period code",IntegerType(),True) \
      .add("decade code",IntegerType(),True)

df=spark.read.csv("movies.csv",header=True,schema=new_schema)

In [10]:
# method_2.2 to read a csv with new_schema(better way):
def get_schema(d):
    new_schema = StructType()
    for key, value in d.items():
        new_schema.add(key,value,True)
    return new_schema

d = {'year':IntegerType(),'imdb':StringType(),'title':StringType(),'test':StringType(),'clean_test':StringType(),
    'binary':StringType(),'budget':DoubleType(),'domgross':StringType(),'intgross':StringType(),'code':StringType(),
    'budget_2013$':DoubleType(),'domgross_2013$':StringType(),'intgross_2013$':StringType(),'period code':IntegerType(),
    'decade code':IntegerType()}
df=spark.read.csv("movies.csv",header=True,schema=get_schema(d))

In [30]:
# method_1 to write: create a folder.
df.write.options(delimiter='|',header=True).mode('overwrite').csv("newmovies/")

# 08. Read and Write: Parquet

1. Writing Parquets:
    1. method_1 : df.write.parquet("filename.parquet")
    2. method_2 : df.write.mode('overwrite').parquet("filename.parquet")
        a) mode:  overwrite, append, error, ignore
    3. method_3 : df.write.partitionBy("col1","col2").parquet("filename.parquet")
    4. 
2. Reading parquets:
    1. method_1 :  spark.read.parquet("filename.parquet")
    2. method_2 :  Partitioned Parquet: spark.read.parquet("movies_partitioned.parquet/binary=FAIL")

In [11]:
from datetime import datetime as dt
# removing special char from col_names:
df = df.toDF(*[col.replace(' ','_').replace(':','').replace('-','_').replace('$','') for col in df.columns])

In [32]:
# method_1: write parquet:
df.write.parquet("movies_"+str(int(dt.timestamp(dt.now())))+".parquet")

In [33]:
# method_2: write parquet with overwrite mode:
df.write.mode('overwrite').parquet("movies.parquet")

In [34]:
# method_3: write parquet with partition:
df.write.partitionBy("binary").parquet("movies_partitioned.parquet")

In [35]:
# method_1: Read parquet file:
par_df = spark.read.parquet("movies.parquet")
len(par_df.columns)

15

In [36]:
# method_2: Read partitioned parquet file:
par_df = spark.read.parquet("movies_partitioned.parquet/binary=FAIL")
len(par_df.columns)

14

# 09. SQL on dataframes:

In [12]:
df.createOrReplaceTempView("df_table")
spark.sql("select count(*) as count_pass from df_table where binary='PASS'").show()

+----------+
|count_pass|
+----------+
|       803|
+----------+



# 10. DataFrame Operations

In [22]:
# describe: overall info of the col:
df.select(['budget','budget_2013','period_code']).describe().show()

+-------+--------------------+-------------------+------------------+
|summary|              budget|        budget_2013|       period_code|
+-------+--------------------+-------------------+------------------+
|  count|                1794|               1794|              1615|
|   mean|4.4826462614269786E7|5.546460845150502E7| 2.419814241486068|
| stddev| 4.818602611895356E7|5.491863559804196E7|1.1946197915091876|
|    min|              7000.0|             8632.0|                 1|
|    max|              4.25E8|       4.61435929E8|                 5|
+-------+--------------------+-------------------+------------------+



In [28]:
# distinct:
print(df.select(['year']).distinct().count())
df.select(['binary']).distinct().show()

44
+------+
|binary|
+------+
|  FAIL|
|  PASS|
+------+



In [43]:
# using subtract: count of PASS in year 1982:
%time print("Using subtract of fails: ",df[df.year=='1982'].subtract(df[df.binary=='FAIL']).count())
# Same can be done efficiently as:
%time print("Directly counting pass: ",df[df.year=='1982'][df.binary=='PASS'].count())

Using subtract of fails:  3
Wall time: 1.69 s
Directly counting pass:  3
Wall time: 306 ms


In [None]:
# for further: https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/
# Also: https://towardsdatascience.com/the-most-complete-guide-to-pyspark-dataframes-2702c343b2e8