In [1]:
# Testing pyspark installation

import pyspark
import findspark
findspark.init()
findspark.find()

'C:\\Spark\\spark-3.1.2-bin-hadoop3.2'

In [2]:
# initiate spark content

from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession

conf=SparkConf().setAppName('sparkApp').setMaster('local')
sc=SparkContext(conf=conf)
spark=SparkSession(sc)

In [3]:
spark

In [4]:
# Example Test Code

numeric_val=sc.parallelize([1,2,3,4])

In [5]:
numeric_val.map(lambda x:x**3).collect()

[1, 8, 27, 64]

In [6]:
# Stop the Session

sc.stop()

In [7]:
spark=SparkSession.builder.appName('practice').getOrCreate()

In [8]:
spark

In [9]:
spark.stop()

#### Pyspark DataFrame

In [10]:
spark=SparkSession.builder.appName('DataFrame').getOrCreate()

In [11]:
spark

In [12]:
df_pyspark=spark.read.csv('homeprices.csv')

df_pyspark.show()

+----+--------+---+------+
| _c0|     _c1|_c2|   _c3|
+----+--------+---+------+
|area|bedrooms|age| price|
|2600|       3| 20|550000|
|3000|       4| 15|565000|
|3200|    null| 18|610000|
|3600|       3| 30|595000|
|4000|       5|  8|760000|
|4100|       6|  8|810000|
+----+--------+---+------+



In [13]:
df_pyspark=spark.read.csv('homeprices.csv',header=True)

df_pyspark.show()

+----+--------+---+------+
|area|bedrooms|age| price|
+----+--------+---+------+
|2600|       3| 20|550000|
|3000|       4| 15|565000|
|3200|    null| 18|610000|
|3600|       3| 30|595000|
|4000|       5|  8|760000|
|4100|       6|  8|810000|
+----+--------+---+------+



In [14]:
# Check the schema

df_pyspark.printSchema()

root
 |-- area: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- age: string (nullable = true)
 |-- price: string (nullable = true)



In [15]:
# Changing inferschema value

df_pyspark=spark.read.csv('homeprices.csv',inferSchema=True,header=True)

df_pyspark.show()

+----+--------+---+------+
|area|bedrooms|age| price|
+----+--------+---+------+
|2600|       3| 20|550000|
|3000|       4| 15|565000|
|3200|    null| 18|610000|
|3600|       3| 30|595000|
|4000|       5|  8|760000|
|4100|       6|  8|810000|
+----+--------+---+------+



In [16]:
# Check the schema

df_pyspark.printSchema()

root
 |-- area: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- price: integer (nullable = true)



In [17]:
spark.stop()

#### RDD Transformation and Actions

In [18]:
%%writefile example.txt
first 
second line
the third line
then a fourth line

Overwriting example.txt


In [19]:
sc=SparkContext()

In [20]:
sc.textFile('example.txt')

example.txt MapPartitionsRDD[1] at textFile at <unknown>:0

In [21]:
text_rdd=sc.textFile('example.txt')

In [22]:
# Transformation

words=text_rdd.map(lambda line:line.split())

In [23]:
words.collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [24]:
text_rdd.collect()

['first ', 'second line', 'the third line', 'then a fourth line']

In [25]:
# map vs flatMap

text_rdd.flatMap(lambda line:line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

In [26]:
%%writefile service.txt
#EventId   TimeStamp    Customer   State   ServiceId   Amount
201        10/13/2017    100        NY        131       100.00
204        10/18/2017    700        TX        129       450.00
202        10/15/2017    203        CA        121       200.00
206        10/19/2017    202        CA        131       500.00
203        10/17/2017    101        NY        173       750.00
205        10/19/2017    202        TX        121       200.00

Overwriting service.txt


In [27]:
services=sc.textFile('service.txt')

In [28]:
services.take(2)

['#EventId   TimeStamp    Customer   State   ServiceId   Amount',
 '201        10/13/2017    100        NY        131       100.00']

In [29]:
services.map(lambda line:line.split()).take(3)

[['#EventId', 'TimeStamp', 'Customer', 'State', 'ServiceId', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [30]:
services.map(lambda line:line[1:] if line[0]=='#' else line).collect()

['EventId   TimeStamp    Customer   State   ServiceId   Amount',
 '201        10/13/2017    100        NY        131       100.00',
 '204        10/18/2017    700        TX        129       450.00',
 '202        10/15/2017    203        CA        121       200.00',
 '206        10/19/2017    202        CA        131       500.00',
 '203        10/17/2017    101        NY        173       750.00',
 '205        10/19/2017    202        TX        121       200.00']

In [31]:
clean=services.map(lambda line:line[1:] if line[0]=='#' else line)

In [32]:
clean=clean.map(lambda line:line.split())
clean.collect()

[['EventId', 'TimeStamp', 'Customer', 'State', 'ServiceId', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [33]:
pairs= clean.map(lambda lst: (lst[3],lst[-1]))

In [34]:
pairs.collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [35]:
rekey=pairs.reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))

In [36]:
rekey.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [37]:
# Grab (State, Amount)
step1= clean.map(lambda lst: (lst[3],lst[-1]))

# resuce by key
step2=step1.reduceByKey(lambda amt1,amt2:float(amt1)+float(amt2))

# Get rid of state,amount titles
step3=step2.filter(lambda x : not x[0]=='State')

# Sort Resulty by Amount
step4=step3.sortBy(lambda stAmount: stAmount[1],ascending=False)

# Perfoem Action
step4.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

In [38]:
def func1(lst):
    return lst[-1]

In [39]:
x=['ID','State','Amount']

In [40]:
func1(x)

'Amount'

In [41]:
def func2(lst):
    (id,state,amt)=lst
    return amt

In [42]:
func2(x)

'Amount'

#### Groupby

In [43]:
clean.collect()

[['EventId', 'TimeStamp', 'Customer', 'State', 'ServiceId', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [44]:
grp=clean.groupBy(lambda x: x[3])
[(k,list(v)) for (k,v) in grp.take(5)][1:]

[('NY',
  [['201', '10/13/2017', '100', 'NY', '131', '100.00'],
   ['203', '10/17/2017', '101', 'NY', '173', '750.00']]),
 ('TX',
  [['204', '10/18/2017', '700', 'TX', '129', '450.00'],
   ['205', '10/19/2017', '202', 'TX', '121', '200.00']]),
 ('CA',
  [['202', '10/15/2017', '203', 'CA', '121', '200.00'],
   ['206', '10/19/2017', '202', 'CA', '131', '500.00']])]

#### Getting Sample

In [45]:
clean.collect()

[['EventId', 'TimeStamp', 'Customer', 'State', 'ServiceId', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [46]:
clean.sample(False,0.4).collect()

[['EventId', 'TimeStamp', 'Customer', 'State', 'ServiceId', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [47]:
clean.sample(False,0.4).collect()

[['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [48]:
# Seed value return same set of valeus

clean.sample(False,0.4,1).collect()

[['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [49]:
clean.sample(False,0.4,1).collect()

[['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

#### DataFrame Operations

In [50]:
# conf=SparkConf().setAppName('sparkApp').setMaster('local')
# sc=SparkContext(conf=conf)
# spark=SparkSession(sc)

In [51]:
spark=SparkSession.builder.appName('DataFrame').getOrCreate()

In [52]:
df=spark.read.csv('salaries.csv',header=True,inferSchema=True)
df.collect()

[Row(company='google', job='sales executive', degree='bachelors', salary_more_then_100k=0),
 Row(company='google', job='sales executive', degree='masters', salary_more_then_100k=0),
 Row(company='google', job='business manager', degree='bachelors', salary_more_then_100k=1),
 Row(company='google', job='business manager', degree='masters', salary_more_then_100k=1),
 Row(company='google', job='computer programmer', degree='bachelors', salary_more_then_100k=0),
 Row(company='google', job='computer programmer', degree='masters', salary_more_then_100k=1),
 Row(company='abc pharma', job='sales executive', degree='masters', salary_more_then_100k=0),
 Row(company='abc pharma', job='computer programmer', degree='bachelors', salary_more_then_100k=0),
 Row(company='abc pharma', job='business manager', degree='bachelors', salary_more_then_100k=0),
 Row(company='abc pharma', job='business manager', degree='masters', salary_more_then_100k=1),
 Row(company='facebook', job='sales executive', degree='ba

In [53]:
df.show()

+----------+-------------------+---------+---------------------+
|   company|                job|   degree|salary_more_then_100k|
+----------+-------------------+---------+---------------------+
|    google|    sales executive|bachelors|                    0|
|    google|    sales executive|  masters|                    0|
|    google|   business manager|bachelors|                    1|
|    google|   business manager|  masters|                    1|
|    google|computer programmer|bachelors|                    0|
|    google|computer programmer|  masters|                    1|
|abc pharma|    sales executive|  masters|                    0|
|abc pharma|computer programmer|bachelors|                    0|
|abc pharma|   business manager|bachelors|                    0|
|abc pharma|   business manager|  masters|                    1|
|  facebook|    sales executive|bachelors|                    1|
|  facebook|    sales executive|  masters|                    1|
|  facebook|   business m

In [54]:
# columns

df.columns

['company', 'job', 'degree', 'salary_more_then_100k']

In [55]:
# ordering

df.orderBy('degree',ascending=False).toPandas()

Unnamed: 0,company,job,degree,salary_more_then_100k
0,google,sales executive,masters,0
1,google,business manager,masters,1
2,google,computer programmer,masters,1
3,abc pharma,sales executive,masters,0
4,abc pharma,business manager,masters,1
5,facebook,sales executive,masters,1
6,facebook,business manager,masters,1
7,facebook,computer programmer,masters,1
8,google,sales executive,bachelors,0
9,google,business manager,bachelors,1


In [56]:
dff=spark.read.csv('homeprices.csv',header=True,inferSchema=True)
dff.show()

+----+--------+---+------+
|area|bedrooms|age| price|
+----+--------+---+------+
|2600|       3| 20|550000|
|3000|       4| 15|565000|
|3200|    null| 18|610000|
|3600|       3| 30|595000|
|4000|       5|  8|760000|
|4100|       6|  8|810000|
+----+--------+---+------+



In [57]:
type(dff)

pyspark.sql.dataframe.DataFrame

#### Creating DataFrame using pyspark.sql.Row()

In [58]:
# Creating Rows

In [59]:
from pyspark.sql import Row,DataFrame

In [60]:
Employee=Row('firstName','lasetName','email','salary')

In [61]:
employee1=Employee('Aswin','M','m7aswin@gmail.com',10000)
employee2=Employee('Raghav','Naik','raghav@gmail.com',40000)
employee3=Employee('Sanal','Kumar','sanal@gmail.com',35000)
employee4=Employee('Binu','Jayaraj','binu@gmail.com',60000)
employee5=Employee('Jose','Baby','baby@gmail.com',60000)

In [62]:
department1=Row(id='1234',name='HR')
department2=Row(id='2314',name='DEV')
department3=Row(id='3412',name='FN')
department4=Row(id='4123',name='OPS')

In [63]:
employee2

Row(firstName='Raghav', lasetName='Naik', email='raghav@gmail.com', salary=40000)

In [64]:
department2

Row(id='2314', name='DEV')

In [65]:
# Creating DataFrame with Rows

In [66]:
departmentwithEmployee1=Row(department=department1,employee=[employee3])
departmentwithEmployee2=Row(department=department2,employee=[employee2,employee1])
departmentwithEmployee3=Row(department=department4,employee=[employee4,employee5])

In [67]:
departmentwithEmployee2

Row(department=Row(id='2314', name='DEV'), employee=[Row(firstName='Raghav', lasetName='Naik', email='raghav@gmail.com', salary=40000), Row(firstName='Aswin', lasetName='M', email='m7aswin@gmail.com', salary=10000)])

In [68]:
departmentwithEmployee1

Row(department=Row(id='1234', name='HR'), employee=[Row(firstName='Sanal', lasetName='Kumar', email='sanal@gmail.com', salary=35000)])

In [69]:
departmentEmp_Seq=[departmentwithEmployee1,departmentwithEmployee2]
dframe=spark.createDataFrame(departmentEmp_Seq)

In [70]:
display(dframe)

DataFrame[department: struct<id:string,name:string>, employee: array<struct<firstName:string,lasetName:string,email:string,salary:bigint>>]

In [71]:
type(dframe)

pyspark.sql.dataframe.DataFrame

In [72]:
dframe.show()

+-----------+--------------------+
| department|            employee|
+-----------+--------------------+
| {1234, HR}|[{Sanal, Kumar, s...|
|{2314, DEV}|[{Raghav, Naik, r...|
+-----------+--------------------+

