Installing the dependencies for spark

In [53]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz
!tar xf spark-3.0.3-bin-hadoop3.2.tgz
!pip install -q findspark

In [54]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop3.2"

In [55]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
!pip install pyspark==3.0.3



Creating a spark session

In [56]:
conf = SparkConf().set('spark.ui.port', '4050')
sc = SparkContext(conf=conf)
spark = SparkSession.builder.master('local[*]').getOrCreate()

ValueError: ignored

Loading the spark UI

In [57]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')

--2021-09-13 02:56:16--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 52.202.168.65, 54.237.133.81, 54.161.241.46, ...
Connecting to bin.equinox.io (bin.equinox.io)|52.202.168.65|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip.1’


2021-09-13 02:56:16 (64.0 MB/s) - ‘ngrok-stable-linux-amd64.zip.1’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
replace ngrok? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: ngrok                   


Go to the link generated in the output to visit the spark UI.

In [58]:
!curl -s http://localhost:4040/api/tunnels

{"tunnels":[{"name":"command_line","uri":"/api/tunnels/command_line","public_url":"https://217d-35-245-217-213.ngrok.io","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":1,"gauge":0,"rate1":2.1622517425246317e-22,"rate5":3.5113178295070544e-7,"rate15":0.0000524751278903042,"p50":30129163841,"p90":30129163841,"p95":30129163841,"p99":30129163841},"http":{"count":1,"rate1":1.3114719758582564e-22,"rate5":3.177171758754793e-7,"rate15":0.00005075478857035456,"p50":121095636,"p90":121095636,"p95":121095636,"p99":121095636}}},{"name":"command_line (http)","uri":"/api/tunnels/command_line%20%28http%29","public_url":"http://217d-35-245-217-213.ngrok.io","proto":"http","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":64,"gauge":0,"rate1":2.635818750209139e-15,"rate5":0.00011521886636229623,"rate15":0.004820331653532405,"p50":38051690.5,"p90":10939248970,"p95":30253750061.5,"p99":38990183413},"http":{"count":14

Creating the spark dataframe

In [59]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

In [60]:
schema=StructType(
    [
     StructField(name='city', dataType=StringType(), nullable=True),
     StructField(name='country',dataType=StringType(), nullable=True),
     StructField(name='counts',dataType=LongType(),nullable=False),
    ]
)

In [61]:
rows=[
      Row('Los Angeles','United States',3),
      Row('New York', 'United States', 1),
      Row('London','United Kingdom', 1),
]

In [62]:
parallelizeRows=spark.sparkContext.parallelize(rows)

In [63]:
df=spark.createDataFrame(parallelizeRows,schema)

In [64]:
df.show()

+-----------+--------------+------+
|       city|       country|counts|
+-----------+--------------+------+
|Los Angeles| United States|     3|
|   New York| United States|     1|
|     London|United Kingdom|     1|
+-----------+--------------+------+



Create a dataframe from different types of files.

In [22]:
#Dataframe from csv
df=spark.read.csv('yourfile.csv',inferSchema=True,header=True)
#Dataframe from json
df_json=spark.read.json('yourfile.json')

AnalysisException: ignored

Printing the schema

In [65]:
df.printSchema()

root
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- counts: long (nullable = false)



Manupilating Columns-: Columns in spark are similar to columns in pandas. We can select, transform and remove columns with the use of expressions. We cannot manupilate a column outside the context of a dataframe, therefore we need spark transformations within a dataframe to modify a column.

In [66]:
import pyspark.sql.functions as F

In [67]:
df.select('country').show(1)

+-------------+
|      country|
+-------------+
|United States|
+-------------+
only showing top 1 row



In [68]:
df.select(F.col('country')).show(1)

+-------------+
|      country|
+-------------+
|United States|
+-------------+
only showing top 1 row



In [69]:
df.select('country','city').show(1)

+-------------+-----------+
|      country|       city|
+-------------+-----------+
|United States|Los Angeles|
+-------------+-----------+
only showing top 1 row



Change the column name in the expression

In [70]:
df.select(F.expr('country as destination')).show(2)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



In [71]:
df.select(F.expr('country as destination').alias('country')).show(2)

+-------------+
|      country|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



In [72]:
new_df=df.selectExpr('country as new_country','country')
new_df.show()

+--------------+--------------+
|   new_country|       country|
+--------------+--------------+
| United States| United States|
| United States| United States|
|United Kingdom|United Kingdom|
+--------------+--------------+



In [73]:
new_df2=df.selectExpr('avg(counts)','count(distinct(country))')
new_df2.show()

+------------------+-----------------------+
|       avg(counts)|count(DISTINCT country)|
+------------------+-----------------------+
|1.6666666666666667|                      2|
+------------------+-----------------------+



Passing Explicit Value with literals

In [74]:
df.select(F.expr('*'),F.lit(1).alias('One')).show()

+-----------+--------------+------+---+
|       city|       country|counts|One|
+-----------+--------------+------+---+
|Los Angeles| United States|     3|  1|
|   New York| United States|     1|  1|
|     London|United Kingdom|     1|  1|
+-----------+--------------+------+---+



Adding a column

In [75]:
df=df.withColumn('One',F.lit(1))
df.show()

+-----------+--------------+------+---+
|       city|       country|counts|One|
+-----------+--------------+------+---+
|Los Angeles| United States|     3|  1|
|   New York| United States|     1|  1|
|     London|United Kingdom|     1|  1|
+-----------+--------------+------+---+



Renaming Columns

In [76]:
df=df.withColumnRenamed('one','ONE')
df.columns

['city', 'country', 'counts', 'ONE']

Removing Columns

In [77]:
df=df.drop('ONE')
df.columns

['city', 'country', 'counts']

DataFrame filtering

In [78]:
df.filter(F.col('counts')<2).show(1)

+--------+-------------+------+
|    city|      country|counts|
+--------+-------------+------+
|New York|United States|     1|
+--------+-------------+------+
only showing top 1 row



If we need to perform multiple filters at the same time, we can chain the operations together as Spark will perform all the operations at the same time.

In [79]:
df.where(F.col('counts')<=1).where(F.col('country')!='United States').show()

+------+--------------+------+
|  city|       country|counts|
+------+--------------+------+
|London|United Kingdom|     1|
+------+--------------+------+



Getting the distinct rows

In [80]:
df.select('city').distinct().count()

3

Getting random samples

In [81]:
df.sample(withReplacement=False,fraction=1.0,seed=5).count()

3

Random splits

In [82]:
df2=df.randomSplit([0.67,0.33],seed=5)
print(df.count())
print(df2[0].count())
print(df2[1].count())

3
2
1


Dataframe are immutable,
So to append items in the dataframes create new dataframe and union with the old dataframe

In [83]:
rows=[
      Row('Berlin','Germany',2),
      Row('Bucharest','Romnia',2),
]
parallelizeRows=spark.sparkContext.parallelize(rows)
df2=spark.createDataFrame(rows,schema)

In [84]:
df3=df.union(df2)

In [85]:
df3.createOrReplaceTempView('new_df')
df3.show()

+-----------+--------------+------+
|       city|       country|counts|
+-----------+--------------+------+
|Los Angeles| United States|     3|
|   New York| United States|     1|
|     London|United Kingdom|     1|
|     Berlin|       Germany|     2|
|  Bucharest|        Romnia|     2|
+-----------+--------------+------+



Limiting what we want from the dataframe

In [86]:
df.limit(3).show()

+-----------+--------------+------+
|       city|       country|counts|
+-----------+--------------+------+
|Los Angeles| United States|     3|
|   New York| United States|     1|
|     London|United Kingdom|     1|
+-----------+--------------+------+

