## *🔗* Reference : https://www.youtube.com/@rajasdataengineering7585

In [1]:
!pip install pyspark
from pyspark import SparkContext
import numpy as np

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m17.8 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=5b353251e52fb8775367ab212be6a4a6415b4a01cf3efd0bbd773c7524fb92c4
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

### for older versions of Spark we use SparkContext to interact with Spark but the for >2.0 versions SparkSession is used to instead of SparkContext.

In [2]:
## creating SparkContext with 2 CPU cores
sc=SparkContext(master="local[2]")

In [3]:
lst=np.random.randint(0,10,20)
print(lst)


[6 1 2 8 3 5 6 0 4 8 9 6 2 3 3 3 2 9 3 8]


In [4]:
my_rdd=sc.parallelize(lst)
my_rdd.collect()

[6, 1, 2, 8, 3, 5, 6, 0, 4, 8, 9, 6, 2, 3, 3, 3, 2, 9, 3, 8]

In [5]:
## using glom to view how the data in RDD is spread across the 2 paritions in different Nodes
my_rdd.glom().collect()

[[6, 1, 2, 8, 3, 5, 6, 0, 4, 8], [9, 6, 2, 3, 3, 3, 2, 9, 3, 8]]

In [6]:
sc.stop()

In [7]:
#spread across the 3 paritions in different Nodes
sc2=SparkContext(master="local[3]")
my_rdd2=sc2.parallelize(lst)
my_rdd2.glom().collect()

[[6, 1, 2, 8, 3, 5], [6, 0, 4, 8, 9, 6], [2, 3, 3, 3, 2, 9, 3, 8]]

In [8]:
my_rdd2.count()

20

In [9]:
rdd_distinct=my_rdd2.distinct()
rdd_distinct.collect()

[6, 3, 0, 9, 1, 4, 2, 8, 5]

## using Reduce method to get overall aggregated values

In [10]:
rdd_distinct.reduce(lambda a,b:a+b)


38

In [11]:
rdd_distinct.reduce(lambda a,b:a if a>b else b)

9

In [12]:
words = 'These always exists a unknow thing in science to find and explore it is a continous learing process'.split(' ')
w_rdd=sc2.parallelize(words)
w_rdd.collect()

['These',
 'always',
 'exists',
 'a',
 'unknow',
 'thing',
 'in',
 'science',
 'to',
 'find',
 'and',
 'explore',
 'it',
 'is',
 'a',
 'continous',
 'learing',
 'process']

In [13]:
## map applies a function on each element and return new RDD with similar collections
a=rdd_distinct.map(lambda x:(x,x*x))
a.collect()

[(6, 36), (3, 9), (0, 0), (9, 81), (1, 1), (4, 16), (2, 4), (8, 64), (5, 25)]

In [14]:
## applies on each element but returns the entire group in form of single rdd without any collections
rdd_distinct.flatMap(lambda x:(x,x*x)).collect()

[6, 36, 3, 9, 0, 0, 9, 81, 1, 1, 4, 16, 2, 4, 8, 64, 5, 25]

In [15]:
c=w_rdd.groupBy(lambda x:len(x)).collect()
print(sorted([(key,sorted(val)) for key,val in c]))

[(1, ['a', 'a']), (2, ['in', 'is', 'it', 'to']), (3, ['and']), (4, ['find']), (5, ['These', 'thing']), (6, ['always', 'exists', 'unknow']), (7, ['explore', 'learing', 'process', 'science']), (9, ['continous'])]


In [16]:
sc2.stop()

#Now using the Spark Session to explore DataFrame

#### Spark DataFrame basics

In [17]:
from pyspark.sql import SparkSession

spark=SparkSession.builder.appName('Data Frame intro').getOrCreate()
df0 = spark.read.format('json').options(inferSchema=True).load('/content/sample_data/anscombe.json')


### Adding user_defined Schema similar to SQL works only with Databricks

#### uds='Series_rank STRING, X DECIMAL(10,2), Y DECIMAL(10,2)'

#### df0 = spark.read.format('json').options(inferSchema=False,).schema(uds).load('/content/sample_data/anscombe.json')




In [18]:
df0.printSchema()

root
 |-- Series: string (nullable = true)
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)
 |-- _corrupt_record: string (nullable = true)



In [19]:
df0.show()

+------+----+-----+---------------+
|Series|   X|    Y|_corrupt_record|
+------+----+-----+---------------+
|  null|null| null|              [|
|     I|10.0| 8.04|           null|
|     I| 8.0| 6.95|           null|
|     I|13.0| 7.58|           null|
|     I| 9.0| 8.81|           null|
|     I|11.0| 8.33|           null|
|     I|14.0| 9.96|           null|
|     I| 6.0| 7.24|           null|
|     I| 4.0| 4.26|           null|
|     I|12.0|10.84|           null|
|     I| 7.0| 4.81|           null|
|     I| 5.0| 5.68|           null|
|    II|10.0| 9.14|           null|
|    II| 8.0| 8.14|           null|
|    II|13.0| 8.74|           null|
|    II| 9.0| 8.77|           null|
|    II|11.0| 9.26|           null|
|    II|14.0|  8.1|           null|
|    II| 6.0| 6.13|           null|
|    II| 4.0|  3.1|           null|
+------+----+-----+---------------+
only showing top 20 rows



In [20]:
from pyspark.sql.types import StructType, StringType, IntegerType, StructField
new_schema=StructType([StructField("name",StringType(),True),StructField("age",IntegerType(),False)])
print(new_schema)
df0.show()

StructType([StructField('name', StringType(), True), StructField('age', IntegerType(), False)])
+------+----+-----+---------------+
|Series|   X|    Y|_corrupt_record|
+------+----+-----+---------------+
|  null|null| null|              [|
|     I|10.0| 8.04|           null|
|     I| 8.0| 6.95|           null|
|     I|13.0| 7.58|           null|
|     I| 9.0| 8.81|           null|
|     I|11.0| 8.33|           null|
|     I|14.0| 9.96|           null|
|     I| 6.0| 7.24|           null|
|     I| 4.0| 4.26|           null|
|     I|12.0|10.84|           null|
|     I| 7.0| 4.81|           null|
|     I| 5.0| 5.68|           null|
|    II|10.0| 9.14|           null|
|    II| 8.0| 8.14|           null|
|    II|13.0| 8.74|           null|
|    II| 9.0| 8.77|           null|
|    II|11.0| 9.26|           null|
|    II|14.0|  8.1|           null|
|    II| 6.0| 6.13|           null|
|    II| 4.0|  3.1|           null|
+------+----+-----+---------------+
only showing top 20 rows



In [21]:
df=spark.read.format('csv').options(inferSchema=True,header=True).load('/content/sample_data/california_housing_train.csv')
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



### Common format used to read data from different files


df= spark.read.format('filetype').options(inferSchema=True/False,sep=',',header=True/False).schema(user_defined_schema).load('<file path/ can be a folder also that has multiple files >')

Valid options for filetype:

'csv' , 'avro', 'json', 'parquet', 'orc'

Valid options:
 inferSchema=True/False

 header=True/False
 
 sep="< delimiter char >"

Incase of inferSchema set to False and user wants to provide explict schema use the schema option:

user_defined_schema= 
schema(user_defined_schema)

2 ways to define schema

using StructType()


Using SQL like schema: **(valid only for databricks)**

user_defined_schema='column_name DATATYPE, Col2 DATATYPE...'

In [22]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
+---------+--------+----

In [23]:
df.count()

17000

In [24]:
## REading data from multiple files to single dataframe
df2=spark.read.format('csv').options(inferSchema=True,header=True).load(['/content/sample_data/california_housing_train.csv',
                                                             '/content/sample_data/california_housing_test.csv'])
df2.count()

20000

In [25]:
df2.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

In [26]:
type(df['longitude'])
## columns are of sql.column type 

pyspark.sql.column.Column

In [27]:
type(df.head(2)[0])
## rows are of sql.row type

pyspark.sql.types.Row

### Selecting each column values can be shown by using

##### df.select('col1') 



In [28]:
df.select('longitude').show(5)

+---------+
|longitude|
+---------+
|  -114.31|
|  -114.47|
|  -114.56|
|  -114.57|
|  -114.57|
+---------+
only showing top 5 rows



In [29]:
df.select(['longitude','latitude']).show(10)

+---------+--------+
|longitude|latitude|
+---------+--------+
|  -114.31|   34.19|
|  -114.47|    34.4|
|  -114.56|   33.69|
|  -114.57|   33.64|
|  -114.57|   33.57|
|  -114.58|   33.63|
|  -114.58|   33.61|
|  -114.59|   34.83|
|  -114.59|   33.61|
|   -114.6|   34.83|
+---------+--------+
only showing top 10 rows



In [30]:
## asDict() method allows to get the spark rows in dictionary form

df.head(2)[0].asDict()

{'longitude': -114.31,
 'latitude': 34.19,
 'housing_median_age': 15.0,
 'total_rooms': 5612.0,
 'total_bedrooms': 1283.0,
 'population': 1015.0,
 'households': 472.0,
 'median_income': 1.4936,
 'median_house_value': 66900.0}

### withColumn('column name', value) used to create new column in data or mainpulate 

its values 
### columns can be renamed using withColumnRenamed('old',  'new')

### drop column with df.drop('column name')

In [31]:
df.withColumn('room_per_person',df['total_rooms']/df['population']).limit(5).select(['room_per_person','total_rooms','population']).show()

+------------------+-----------+----------+
|   room_per_person|total_rooms|population|
+------------------+-----------+----------+
| 5.529064039408867|     5612.0|    1015.0|
| 6.775907883082374|     7650.0|    1129.0|
|2.1621621621621623|      720.0|     333.0|
|2.9145631067961166|     1501.0|     515.0|
|2.3301282051282053|     1454.0|     624.0|
+------------------+-----------+----------+



In [32]:
df.filter('total_rooms>1000  and median_house_value<60000').show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.59|   33.61|              34.0|     4789.0|        1175.0|    3134.0|    1056.0|       2.1782|           58400.0|
|   -114.6|   34.83|              46.0|     1497.0|         309.0|     787.0|     271.0|       2.1908|           48100.0|
|  -114.61|   34.84|              48.0|     1291.0|         248.0|     580.0|     211.0|       2.1571|           48600.0|
|  -114.63|   32.76|              15.0|     1448.0|         378.0|     949.0|     300.0|       0.8585|           45000.0|
|  -114.66|   32.74|              17.0|     1388.0|         386.0|     775.0|     320.0|       1.2049|           44000.0|
|  -114.68|   33.49|    

In [33]:
df.filter((df.total_rooms>6000) & (df.median_house_value<60000)).show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -116.51|   34.45|              21.0|     8502.0|        2634.0|    2330.0|     991.0|       1.3811|           51300.0|
|  -116.57|   35.43|               8.0|     9975.0|        1743.0|    6835.0|    1439.0|       2.7138|           22500.0|
|  -117.29|   35.54|              35.0|     7922.0|        1636.0|    3431.0|    1329.0|       3.4145|           40400.0|
|  -117.37|   34.59|              39.0|     8193.0|        1747.0|    6852.0|    1597.0|       2.3832|           35000.0|
|   -118.9|   35.26|              31.0|     6145.0|        1492.0|    5666.0|    1457.0|       1.9066|           54600.0|
+---------+--------+----

### other commands for filter inclue

df.column.isNull()

df.column.isNotNull()

df.column.isin((a,b,c)

~df.column.isin(a,b,c...)

df.column.like('%cecemkmlce%)

df.column.contains('cddec')

df.column.startswith('a')

df.column.endswith('i)



In [34]:
d=[('Robert',23),('Alen',24),('Jason',20),('Peter',22),('June',22),('Alex',25),('Leo',24)]

df2=spark.createDataFrame(d,schema=['name','age'])

df2.show()


+------+---+
|  name|age|
+------+---+
|Robert| 23|
|  Alen| 24|
| Jason| 20|
| Peter| 22|
|  June| 22|
|  Alex| 25|
|   Leo| 24|
+------+---+



In [35]:
df2.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [36]:
df2.filter(df2.name.like('%n%')).show()

+-----+---+
| name|age|
+-----+---+
| Alen| 24|
|Jason| 20|
| June| 22|
+-----+---+



In [37]:
df2.filter(df2.name.contains('e')).show()

+------+---+
|  name|age|
+------+---+
|Robert| 23|
|  Alen| 24|
| Peter| 22|
|  June| 22|
|  Alex| 25|
|   Leo| 24|
+------+---+



In [38]:
df2.filter(df2.name.startswith('A')).show()

+----+---+
|name|age|
+----+---+
|Alen| 24|
|Alex| 25|
+----+---+



In [39]:
df2.filter(df2.age.isin(22,24)).select(['name','age']).show()

+-----+---+
| name|age|
+-----+---+
| Alen| 24|
|Peter| 22|
| June| 22|
|  Leo| 24|
+-----+---+



### In order to create single column data frame specify the schema of each row then use to toDF('abc') to return the data as dataframe with mentioned column name.  Or use rename column 


In [40]:
df3=spark.createDataFrame(['SL','IN','US','GM','JP','GM','JP'],schema='string').toDF('country')
df3.show()

+-------+
|country|
+-------+
|     SL|
|     IN|
|     US|
|     GM|
|     JP|
|     GM|
|     JP|
+-------+



In [41]:
df4=spark.createDataFrame(['SL','IN','US','GM','JP','GM','JP'],schema='string')
df4.printSchema()
df4=df4.withColumnRenamed('value','country')
df4.printSchema()

root
 |-- value: string (nullable = true)

root
 |-- country: string (nullable = true)



In [42]:
from pyspark.sql.functions import lit, concat

## lit is a function used to convert any object passed to it into a Column datatype in

## concat is used to concat 2 or more columns together into 1 single column

df6=df2.withColumn('college mail',concat(lit('student_'),df2.name,lit('22_@xav.com')))
df6.show()

+------+---+--------------------+
|  name|age|        college mail|
+------+---+--------------------+
|Robert| 23|student_Robert22_...|
|  Alen| 24|student_Alen22_@x...|
| Jason| 20|student_Jason22_@...|
| Peter| 22|student_Peter22_@...|
|  June| 22|student_June22_@x...|
|  Alex| 25|student_Alex22_@x...|
|   Leo| 24|student_Leo22_@xa...|
+------+---+--------------------+



In [43]:
from pyspark.sql.functions import substring
df7=df6.withColumn('nick',substring(df6.name,2,4))
df7.show()

+------+---+--------------------+----+
|  name|age|        college mail|nick|
+------+---+--------------------+----+
|Robert| 23|student_Robert22_...|ober|
|  Alen| 24|student_Alen22_@x...| len|
| Jason| 20|student_Jason22_@...|ason|
| Peter| 22|student_Peter22_@...|eter|
|  June| 22|student_June22_@x...| une|
|  Alex| 25|student_Alex22_@x...| lex|
|   Leo| 24|student_Leo22_@xa...|  eo|
+------+---+--------------------+----+



In [44]:
from pyspark.sql.functions import column
df3.withColumn('test',df3.country.endswith('N')).show()

+-------+-----+
|country| test|
+-------+-----+
|     SL|false|
|     IN| true|
|     US|false|
|     GM|false|
|     JP|false|
|     GM|false|
|     JP|false|
+-------+-----+



#### drop is used to remove the column inplace 

df.drop('col name')
can used to drop only 1 column at once

df.drop('col1').drop('col2') can be used to used multiple columns at once.

In [45]:
df3.drop('test')
df3.show()

+-------+
|country|
+-------+
|     SL|
|     IN|
|     US|
|     GM|
|     JP|
|     GM|
|     JP|
+-------+



In [46]:
df3.sort('country').show()

+-------+
|country|
+-------+
|     GM|
|     GM|
|     IN|
|     JP|
|     JP|
|     SL|
|     US|
+-------+



In [47]:
df3.sort("country",ascending=False).show()

+-------+
|country|
+-------+
|     US|
|     SL|
|     JP|
|     JP|
|     IN|
|     GM|
|     GM|
+-------+



### Orders the data based on list of columns mentioned in order

#### agg is used to perform any aggregation over group of values in dataframe

In [56]:
df2.orderBy(['name','age'],ascending=[0,1]).show()
## ascending takes a list of value interpreted as True  or False applied to sort based on each column values

+------+---+
|  name|age|
+------+---+
|Robert| 23|
| Peter| 22|
|   Leo| 24|
|  June| 22|
| Jason| 20|
|  Alex| 25|
|  Alen| 24|
+------+---+



In [49]:
## corr --> gives correlation between 2 mentioned columns  
from pyspark.sql.functions import corr, dayofweek, dayofmonth, month, year 

df.agg(corr(df.median_house_value,df.total_rooms).alias('rooms_value_corr')).select('rooms_value_corr').show()

+-------------------+
|   rooms_value_corr|
+-------------------+
|0.13099146625326655|
+-------------------+



### GroupBy, agg (aggregrate), 'orderBy' operations


In [50]:
df0.show()

+------+----+-----+---------------+
|Series|   X|    Y|_corrupt_record|
+------+----+-----+---------------+
|  null|null| null|              [|
|     I|10.0| 8.04|           null|
|     I| 8.0| 6.95|           null|
|     I|13.0| 7.58|           null|
|     I| 9.0| 8.81|           null|
|     I|11.0| 8.33|           null|
|     I|14.0| 9.96|           null|
|     I| 6.0| 7.24|           null|
|     I| 4.0| 4.26|           null|
|     I|12.0|10.84|           null|
|     I| 7.0| 4.81|           null|
|     I| 5.0| 5.68|           null|
|    II|10.0| 9.14|           null|
|    II| 8.0| 8.14|           null|
|    II|13.0| 8.74|           null|
|    II| 9.0| 8.77|           null|
|    II|11.0| 9.26|           null|
|    II|14.0|  8.1|           null|
|    II| 6.0| 6.13|           null|
|    II| 4.0|  3.1|           null|
+------+----+-----+---------------+
only showing top 20 rows



In [51]:
df10=df0.select('X','Y','Series').groupBy('Series').mean().select('avg(X)','avg(Y)','Series')
df10.show()

+------+-----------------+------+
|avg(X)|           avg(Y)|Series|
+------+-----------------+------+
|  null|             null|  null|
|   9.0|7.500000000000001|   III|
|   9.0| 7.50090909090909|    IV|
|   9.0|7.500909090909091|    II|
|   9.0|              7.5|     I|
+------+-----------------+------+



In [52]:
df11=df0.select('X','Y','Series').groupBy('Series').max().select('max(X)','max(Y)','Series')
df11.show()

+------+------+------+
|max(X)|max(Y)|Series|
+------+------+------+
|  null|  null|  null|
|  14.0| 12.74|   III|
|  19.0|  12.5|    IV|
|  14.0|  9.26|    II|
|  14.0| 10.84|     I|
+------+------+------+



In [53]:
df11=df0.select('X','Y','Series').groupBy('Series').min().select('min(X)','min(Y)','Series')
df11.show()

+------+------+------+
|min(X)|min(Y)|Series|
+------+------+------+
|  null|  null|  null|
|   4.0|  5.39|   III|
|   8.0|  5.25|    IV|
|   4.0|   3.1|    II|
|   4.0|  4.26|     I|
+------+------+------+



In [54]:
## No of non null valued rows
df11=df0.select('X','Y','Series').groupBy('Series').count().select('count','Series')
df11.show()

+-----+------+
|count|Series|
+-----+------+
|    2|  null|
|   11|   III|
|   11|    IV|
|   11|    II|
|   11|     I|
+-----+------+



### aggregations syntax

df.agg({'column_name':'aggregation_operation','col2':'stddev',....})

In [78]:
## General convention create a dataframe with groupby and use the same data to apply multiple aggregations
from pyspark.sql.functions import round, countDistinct

group_data=df0.groupBy('Series')
d0=group_data.agg({'X':'mean','Y':'stddev'})
d0.select('Series', *[round(i,2) for i in d0.columns[1:]] ).show()

+------+----------------+-------------------+
|Series|round(avg(X), 2)|round(stddev(Y), 2)|
+------+----------------+-------------------+
|  null|            null|               null|
|   III|             9.0|               2.03|
|    IV|             9.0|               2.03|
|    II|             9.0|               2.03|
|     I|             9.0|               2.03|
+------+----------------+-------------------+

