<a href="https://colab.research.google.com/github/itsmevishnu/python/blob/master/learn_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Basic Introduction to Pyspark
---
1. Installation of Pyspark package in python
2. Import `pytspark`, `pyspark.sql.SparkSession` for creating new spark session.
3. Creating new spark session
4. Reading csv files
5. Reading csv file with header values.
6. Reading csv file with header and inferSchema.[With different data types instead of default string types]
7. Basic methods and properties like `show()`, `printSchema()`, `head()`, `columns`

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import pyspark 

In [None]:
import pandas as pd
df =  pd.read_csv('organisations.csv')

In [None]:
df.head()

Unnamed: 0,Index,Organization Id,Name,Website,Country,Description,Founded,Industry,Number of employees
0,1,FAB0d41d5b5d22c,Ferrell LLC,https://price.net/,Papua New Guinea,Horizontal empowering knowledgebase,1990,Plastics,3498
1,2,6A7EdDEA9FaDC52,"Mckinney, Riley and Day",http://www.hall-buchanan.info/,Finland,User-centric system-worthy leverage,2015,Glass / Ceramics / Concrete,4952
2,3,0bFED1ADAE4bcC1,Hester Ltd,http://sullivan-reed.com/,China,Switchable scalable moratorium,1971,Public Safety,5287
3,4,2bFC1Be8a4ce42f,Holder-Sellers,https://becker.com/,Turkmenistan,De-engineered systemic artificial intelligence,2004,Automotive,921
4,5,9eE8A6a4Eb96C24,Mayer Group,http://www.brewer.com/,Mauritius,Synchronized needs-based challenge,1991,Transportation,7870


In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('practice').getOrCreate()

In [None]:
spark

In [None]:
df_pyspark = spark.read.csv('organisations.csv') #Reading the csv files

In [None]:
df_pyspark #There is header is start with C0, C1, etc

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string]

In [None]:
df_pyspark.show() #to display the details of the table

+-----+---------------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
|  _c0|            _c1|                 _c2|                 _c3|                 _c4|                 _c5|    _c6|                 _c7|                _c8|
+-----+---------------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
|Index|Organization Id|                Name|             Website|             Country|         Description|Founded|            Industry|Number of employees|
|    1|FAB0d41d5b5d22c|         Ferrell LLC|  https://price.net/|    Papua New Guinea|Horizontal empowe...|   1990|            Plastics|               3498|
|    2|6A7EdDEA9FaDC52|Mckinney, Riley a...|http://www.hall-b...|             Finland|User-centric syst...|   2015|Glass / Ceramics ...|               4952|
|    3|0bFED1ADAE4bcC1|          Hester Ltd|http://sulliva

In [None]:
df = spark.read.option('header', 'true').csv('organisations.csv') #the first row will be header instead of c0,c1

In [None]:
df.show()

+-----+---------------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
|Index|Organization Id|                Name|             Website|             Country|         Description|Founded|            Industry|Number of employees|
+-----+---------------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
|    1|FAB0d41d5b5d22c|         Ferrell LLC|  https://price.net/|    Papua New Guinea|Horizontal empowe...|   1990|            Plastics|               3498|
|    2|6A7EdDEA9FaDC52|Mckinney, Riley a...|http://www.hall-b...|             Finland|User-centric syst...|   2015|Glass / Ceramics ...|               4952|
|    3|0bFED1ADAE4bcC1|          Hester Ltd|http://sullivan-r...|               China|Switchable scalab...|   1971|       Public Safety|               5287|
|    4|2bFC1Be8a4ce42f|      Holder-Sellers| https://becke

In [None]:
type(df)

pyspark.sql.dataframe.DataFrame

In [None]:
df.head()

Row(Index='1', Organization Id='FAB0d41d5b5d22c', Name='Ferrell LLC', Website='https://price.net/', Country='Papua New Guinea', Description='Horizontal empowering knowledgebase', Founded='1990', Industry='Plastics', Number of employees='3498')

In [None]:
df.printSchema() #Give more the information of the data, check schema, similar to pd.info()
# by default all the values as string. 

root
 |-- Index: string (nullable = true)
 |-- Organization Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Website: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Founded: string (nullable = true)
 |-- Industry: string (nullable = true)
 |-- Number of employees: string (nullable = true)



In [None]:
#To modify the schema datatype while reading, add inferSchema=True during reading
df = spark.read.option('header', 'true').csv('organisations.csv', inferSchema=True)

In [None]:
#another easy method similar to pandas
df= spark.read.csv('organisations.csv',inferSchema=True, header=True)

In [None]:
df.printSchema()

root
 |-- Index: integer (nullable = true)
 |-- Organization Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Website: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Founded: integer (nullable = true)
 |-- Industry: string (nullable = true)
 |-- Number of employees: integer (nullable = true)



In [None]:
df.head(10)

[Row(Index=1, Organization Id='FAB0d41d5b5d22c', Name='Ferrell LLC', Website='https://price.net/', Country='Papua New Guinea', Description='Horizontal empowering knowledgebase', Founded=1990, Industry='Plastics', Number of employees=3498),
 Row(Index=2, Organization Id='6A7EdDEA9FaDC52', Name='Mckinney, Riley and Day', Website='http://www.hall-buchanan.info/', Country='Finland', Description='User-centric system-worthy leverage', Founded=2015, Industry='Glass / Ceramics / Concrete', Number of employees=4952),
 Row(Index=3, Organization Id='0bFED1ADAE4bcC1', Name='Hester Ltd', Website='http://sullivan-reed.com/', Country='China', Description='Switchable scalable moratorium', Founded=1971, Industry='Public Safety', Number of employees=5287),
 Row(Index=4, Organization Id='2bFC1Be8a4ce42f', Name='Holder-Sellers', Website='https://becker.com/', Country='Turkmenistan', Description='De-engineered systemic artificial intelligence', Founded=2004, Industry='Automotive', Number of employees=921),

In [None]:
df.columns #Get the names of the coloumns

['Index',
 'Organization Id',
 'Name',
 'Website',
 'Country',
 'Description',
 'Founded',
 'Industry',
 'Number of employees']

## Basic operations 
---
1. Select a row using `select()`
2. `dtypes` for showing the datatypes as pandas.
3. Describe the data available(similar to pandas `describe()`) --> `describe()`
4. Create new column with `withColumn("name_of_col", values)`
5. Drop a coloumn is by `drop('name_of_col')`. By default is by column.
6. Rename the coloumn by `withColumnRenamed('old_name', 'mew_name')

In [None]:
df.select("Name").show() # Get the details of the column

+--------------------+
|                Name|
+--------------------+
|         Ferrell LLC|
|Mckinney, Riley a...|
|          Hester Ltd|
|      Holder-Sellers|
|         Mayer Group|
|      Henry-Thompson|
|      Hansen-Everett|
|       Mcintosh-Mora|
|            Carr Inc|
|          Gaines Inc|
|          Kidd Group|
|        Crane-Clarke|
|Keller, Campos an...|
|         Glover-Pope|
|      Pacheco-Spears|
|         Hodge-Ayers|
|Bowers, Guerra an...|
|     Mckenzie-Melton|
|         Branch-Mann|
|      Weiss and Sons|
+--------------------+
only showing top 20 rows



In [None]:
df.select(["Name", "Country"]).show() #Get the multiple columns of the dataset

+--------------------+--------------------+
|                Name|             Country|
+--------------------+--------------------+
|         Ferrell LLC|    Papua New Guinea|
|Mckinney, Riley a...|             Finland|
|          Hester Ltd|               China|
|      Holder-Sellers|        Turkmenistan|
|         Mayer Group|           Mauritius|
|      Henry-Thompson|             Bahamas|
|      Hansen-Everett|            Pakistan|
|       Mcintosh-Mora|Heard Island and ...|
|            Carr Inc|              Kuwait|
|          Gaines Inc|          Uzbekistan|
|          Kidd Group|Bouvet Island (Bo...|
|        Crane-Clarke|             Denmark|
|Keller, Campos an...|             Liberia|
|         Glover-Pope|United Arab Emirates|
|      Pacheco-Spears|              Sweden|
|         Hodge-Ayers|            Honduras|
|Bowers, Guerra an...|              Uganda|
|     Mckenzie-Melton|           Hong Kong|
|         Branch-Mann|            Botswana|
|      Weiss and Sons|          

In [None]:
df["Name"]

Column<'Name'>

In [None]:
df.dtypes #identify the datatypes of each columns similar to pandas 

[('Index', 'int'),
 ('Organization Id', 'string'),
 ('Name', 'string'),
 ('Website', 'string'),
 ('Country', 'string'),
 ('Description', 'string'),
 ('Founded', 'int'),
 ('Industry', 'string'),
 ('Number of employees', 'int')]

In [None]:
df.describe().show() #similar to the pandas describe()  -providing the statistical information

+-------+------------------+--------------------+--------------------+--------+--------------------+-----------------+-----------------+-------------------+------------------+
|summary|             Index|                Name|             Website| Country|         Description|          Founded|         Industry|Number of employees|       Current age|
+-------+------------------+--------------------+--------------------+--------+--------------------+-----------------+-----------------+-------------------+------------------+
|  count|               100|                 100|                 100|     100|                 100|              100|              100|                100|               100|
|   mean|              50.5|                null|                null|    null|                null|          1995.41|             null|            4964.86|             27.59|
| stddev|29.011491975882016|                null|                null|    null|                null|15.74422773814921|  

In [None]:
## adding columns to the dataset 
df = df.withColumn('Current age', 2023-df["Founded"] )

In [None]:
##drop the columns, drop fucntion can be used, by default it take the column
df = df.drop("Organization Id")

In [None]:
## Rename the column head withColumnRenamed(OldName, Newname)
df  = df.withColumnRenamed('Number of employees', 'Employees')

In [None]:
df.printSchema()

root
 |-- Index: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Website: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Founded: integer (nullable = true)
 |-- Industry: string (nullable = true)
 |-- Employees: integer (nullable = true)
 |-- Current age: integer (nullable = true)



## Filters in Pyspark

In [None]:
#delete the null values
df.na.drop().show() #delete the rows with null values
#how
df.na.drop(how="all") #if all the values of the row is null, then it will delete
df.na.drop(how="any") #if any null value is there, then delete the entire row
#default value = any

#thresh
df.na.drop(thresh=2) #Atleadt two non null values should be there to not delete  
#the row. If the non null values are less than 2, then delete rows.
#default value is 0

#subset
df.na.drop(subset=["Name"]) # if there any null in the Name column, those rows will 
#be delete


+-----+--------------------+--------------------+--------------------+--------------------+-------+--------------------+---------+-----------+
|Index|                Name|             Website|             Country|         Description|Founded|            Industry|Employees|Current age|
+-----+--------------------+--------------------+--------------------+--------------------+-------+--------------------+---------+-----------+
|    1|         Ferrell LLC|  https://price.net/|    Papua New Guinea|Horizontal empowe...|   1990|            Plastics|     3498|         33|
|    2|Mckinney, Riley a...|http://www.hall-b...|             Finland|User-centric syst...|   2015|Glass / Ceramics ...|     4952|          8|
|    3|          Hester Ltd|http://sullivan-r...|               China|Switchable scalab...|   1971|       Public Safety|     5287|         52|
|    4|      Holder-Sellers| https://becker.com/|        Turkmenistan|De-engineered sys...|   2004|          Automotive|      921|         19|

In [None]:
## fill the missing value.
df.na.fill('Missing') # all null values changed with Missing
df.na.fill("Missing", "Name") # Null values in the name column replaced with the word Missing


DataFrame[Index: int, Name: string, Website: string, Country: string, Description: string, Founded: int, Industry: string, Employees: int, Current age: int]

In [None]:
#filter operations
df.show()

+-----+--------------------+--------------------+--------------------+--------------------+-------+--------------------+---------+-----------+
|Index|                Name|             Website|             Country|         Description|Founded|            Industry|Employees|Current age|
+-----+--------------------+--------------------+--------------------+--------------------+-------+--------------------+---------+-----------+
|    1|         Ferrell LLC|  https://price.net/|    Papua New Guinea|Horizontal empowe...|   1990|            Plastics|     3498|         33|
|    2|Mckinney, Riley a...|http://www.hall-b...|             Finland|User-centric syst...|   2015|Glass / Ceramics ...|     4952|          8|
|    3|          Hester Ltd|http://sullivan-r...|               China|Switchable scalab...|   1971|       Public Safety|     5287|         52|
|    4|      Holder-Sellers| https://becker.com/|        Turkmenistan|De-engineered sys...|   2004|          Automotive|      921|         19|

In [None]:
df.filter("Founded<1990").show()

+-----+--------------------+--------------------+--------------------+--------------------+-------+--------------------+---------+-----------+
|Index|                Name|             Website|             Country|         Description|Founded|            Industry|Employees|Current age|
+-----+--------------------+--------------------+--------------------+--------------------+-------+--------------------+---------+-----------+
|    3|          Hester Ltd|http://sullivan-r...|               China|Switchable scalab...|   1971|       Public Safety|     5287|         52|
|    8|       Mcintosh-Mora|https://www.brook...|Heard Island and ...|Centralized attit...|   1970|     Import / Export|     4389|         53|
|   15|      Pacheco-Spears|https://aguilar.com/|              Sweden|Secured logistica...|   1984|            Maritime|      769|         39|
|   17|Bowers, Guerra an...|http://www.carril...|              Uganda|De-engineered tra...|   1972|Primary / Seconda...|     6986|         51|

In [None]:
df.filter("Founded>2010").select(["Name", "Country", "Founded"]).show()

+--------------------+--------------------+-------+
|                Name|             Country|Founded|
+--------------------+--------------------+-------+
|Mckinney, Riley a...|             Finland|   2015|
|      Hansen-Everett|            Pakistan|   2018|
|        Crane-Clarke|             Denmark|   2014|
|Keller, Campos an...|             Liberia|   2020|
|         Glover-Pope|United Arab Emirates|   2013|
|      Weiss and Sons|               Korea|   2011|
|         Harrell LLC|          Guadeloupe|   2018|
|Eaton, Reynolds a...|              Monaco|   2014|
|Greene, Benjamin ...|             Romania|   2012|
|           Ayala LLC|         Philippines|   2021|
|Glass, Barrera an...|     Kyrgyz Republic|   2020|
|Baker, Mccann and...|               Kenya|   2013|
|            Hahn PLC|             Belarus|   2012|
|Mitchell, Warren ...| Trinidad and Tobago|   2021|
|          Prince PLC|              Sweden|   2016|
|Wallace, Madden a...|             Germany|   2016|
|     Lawson

+-----+----+-------+-------+-----------+-------+--------+---------+-----------+
|Index|Name|Website|Country|Description|Founded|Industry|Employees|Current age|
+-----+----+-------+-------+-----------+-------+--------+---------+-----------+
+-----+----+-------+-------+-----------+-------+--------+---------+-----------+



In [None]:
df.filter((df["Country"] == "Germany") & (df["Founded"]<2000)).show()

+-----+----+-------+-------+-----------+-------+--------+---------+-----------+
|Index|Name|Website|Country|Description|Founded|Industry|Employees|Current age|
+-----+----+-------+-------+-----------+-------+--------+---------+-----------+
+-----+----+-------+-------+-----------+-------+--------+---------+-----------+



In [None]:
df.filter(~(df["Founded"]<2000)).show()  #inverse operator: Make the inverse of the condition

+-----+--------------------+--------------------+--------------------+--------------------+-------+--------------------+---------+-----------+
|Index|                Name|             Website|             Country|         Description|Founded|            Industry|Employees|Current age|
+-----+--------------------+--------------------+--------------------+--------------------+-------+--------------------+---------+-----------+
|    2|Mckinney, Riley a...|http://www.hall-b...|             Finland|User-centric syst...|   2015|Glass / Ceramics ...|     4952|          8|
|    4|      Holder-Sellers| https://becker.com/|        Turkmenistan|De-engineered sys...|   2004|          Automotive|      921|         19|
|    7|      Hansen-Everett|https://www.kidd....|            Pakistan|Seamless disinter...|   2018| Publishing Industry|     7832|          5|
|   11|          Kidd Group|http://www.lyons....|Bouvet Island (Bo...|Proactive foregro...|   2001|Primary / Seconda...|     7473|         22|