# PySpark basics in python
  PySpark is an interface for Apache Spark in Python. It not only allows you to write Spark applications using Python APIs, but also provides the PySpark shell for interactively analyzing your data in a distributed environment. PySpark supports most of Spark’s features such as Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) and Spark Core.


### Initializing SparkSession
PySpark applications start with initializing SparkSession which is the entry point of PySpark

In [1]:
# importing the SparkSession from pyspark
from pyspark.sql import SparkSession

In [2]:
# getting the instance of SparkSession of App Name First_App
spark = SparkSession.builder.appName('First_App').getOrCreate()

In [6]:
spark

### Reading Pokemon Data Set from CSV file 

In [3]:
# reading the csv file of pokemon data set into spark
# without header it will not be able to header data as column names. it will take like c0,c1 etc
# inferSchema is required to take the data type correctly else everything will be in string data type by default
df_spark = spark.read.csv('pokemon_data.csv',header=True,inferSchema=True)


### To Display the read CSV 

In [4]:
# displaying the data read from the csv
df_spark.show()

+---+--------------------+------+------+---+------+-------+-------+-------+-----+----------+---------+
|  #|                Name|Type 1|Type 2| HP|Attack|Defense|Sp. Atk|Sp. Def|Speed|Generation|Legendary|
+---+--------------------+------+------+---+------+-------+-------+-------+-----+----------+---------+
|  1|           Bulbasaur| Grass|Poison| 45|    49|     49|     65|     65|   45|         1|    FALSE|
|  2|             Ivysaur| Grass|Poison| 60|    62|     63|     80|     80|   60|         1|    FALSE|
|  3|            Venusaur| Grass|Poison| 80|    82|     83|    100|    100|   80|         1|    FALSE|
|  3|VenusaurMega Venu...| Grass|Poison| 80|   100|    123|    122|    120|   80|         1|    FALSE|
|  4|          Charmander|  Fire|  null| 39|    52|     43|     60|     50|   65|         1|    FALSE|
|  5|          Charmeleon|  Fire|  null| 58|    64|     58|     80|     65|   80|         1|    FALSE|
|  6|           Charizard|  Fire|Flying| 78|    84|     78|    109|     8

In [7]:
# to display the first row
df_spark.show(1)

+---+---------+------+------+---+------+-------+-------+-------+-----+----------+---------+
|  #|     Name|Type 1|Type 2| HP|Attack|Defense|Sp. Atk|Sp. Def|Speed|Generation|Legendary|
+---+---------+------+------+---+------+-------+-------+-------+-----+----------+---------+
|  1|Bulbasaur| Grass|Poison| 45|    49|     49|     65|     65|   45|         1|    FALSE|
+---+---------+------+------+---+------+-------+-------+-------+-----+----------+---------+
only showing top 1 row



### Configuration for eager evaluation of pyspark Dataframe
Alternatively, you can enable `spark.sql.repl.eagerEval.enabled` configuration for the eager evaluation of PySpark DataFrame in notebooks such as Jupyter. The number of rows to show can be controlled via `spark.sql.repl.eagerEval.maxNumRows` configuration.

In [5]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [10]:
df_spark

#,Name,Type 1,Type 2,HP,Attack,Defense,Sp. Atk,Sp. Def,Speed,Generation,Legendary
1,Bulbasaur,Grass,Poison,45,49,49,65,65,45,1,False
2,Ivysaur,Grass,Poison,60,62,63,80,80,60,1,False
3,Venusaur,Grass,Poison,80,82,83,100,100,80,1,False
3,VenusaurMega Venu...,Grass,Poison,80,100,123,122,120,80,1,False
4,Charmander,Fire,,39,52,43,60,50,65,1,False
5,Charmeleon,Fire,,58,64,58,80,65,80,1,False
6,Charizard,Fire,Flying,78,84,78,109,85,100,1,False
6,CharizardMega Cha...,Fire,Dragon,78,130,111,130,85,100,1,False
6,CharizardMega Cha...,Fire,Flying,78,104,78,159,115,100,1,False
7,Squirtle,Water,,44,48,65,50,64,43,1,False


In [12]:
# to display the details of the row vertically
df_spark.show(1, vertical=True)

-RECORD 0---------------
 #          | 1         
 Name       | Bulbasaur 
 Type 1     | Grass     
 Type 2     | Poison    
 HP         | 45        
 Attack     | 49        
 Defense    | 49        
 Sp. Atk    | 65        
 Sp. Def    | 65        
 Speed      | 45        
 Generation | 1         
 Legendary  | FALSE     
only showing top 1 row



In [19]:
# to read the head 5 rows
df_spark.head(5)

[Row(#='1', Name='Bulbasaur', Type 1='Grass', Type 2='Poison', HP='45', Attack='49', Defense='49', Sp. Atk='65', Sp. Def='65', Speed='45', Generation='1', Legendary='FALSE'),
 Row(#='2', Name='Ivysaur', Type 1='Grass', Type 2='Poison', HP='60', Attack='62', Defense='63', Sp. Atk='80', Sp. Def='80', Speed='60', Generation='1', Legendary='FALSE'),
 Row(#='3', Name='Venusaur', Type 1='Grass', Type 2='Poison', HP='80', Attack='82', Defense='83', Sp. Atk='100', Sp. Def='100', Speed='80', Generation='1', Legendary='FALSE'),
 Row(#='3', Name='VenusaurMega Venusaur', Type 1='Grass', Type 2='Poison', HP='80', Attack='100', Defense='123', Sp. Atk='122', Sp. Def='120', Speed='80', Generation='1', Legendary='FALSE'),
 Row(#='4', Name='Charmander', Type 1='Fire', Type 2=None, HP='39', Attack='52', Defense='43', Sp. Atk='60', Sp. Def='50', Speed='65', Generation='1', Legendary='FALSE')]

In [20]:
# to get the last 5 rows of the data frame 
df_spark.tail(5)

[Row(#='719', Name='Diancie', Type 1='Rock', Type 2='Fairy', HP='50', Attack='100', Defense='150', Sp. Atk='100', Sp. Def='150', Speed='50', Generation='6', Legendary='TRUE'),
 Row(#='719', Name='DiancieMega Diancie', Type 1='Rock', Type 2='Fairy', HP='50', Attack='160', Defense='110', Sp. Atk='160', Sp. Def='110', Speed='110', Generation='6', Legendary='TRUE'),
 Row(#='720', Name='HoopaHoopa Confined', Type 1='Psychic', Type 2='Ghost', HP='80', Attack='110', Defense='60', Sp. Atk='150', Sp. Def='130', Speed='70', Generation='6', Legendary='TRUE'),
 Row(#='720', Name='HoopaHoopa Unbound', Type 1='Psychic', Type 2='Dark', HP='80', Attack='160', Defense='60', Sp. Atk='170', Sp. Def='130', Speed='80', Generation='6', Legendary='TRUE'),
 Row(#='721', Name='Volcanion', Type 1='Fire', Type 2='Water', HP='80', Attack='110', Defense='120', Sp. Atk='130', Sp. Def='90', Speed='70', Generation='6', Legendary='TRUE')]

In [21]:
# to return the number of rows in data frame
df_spark.count()

800

In [11]:
# to see the type 
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [12]:
# to get the data types of each column
df_spark.dtypes

[('#', 'int'),
 ('Name', 'string'),
 ('Type 1', 'string'),
 ('Type 2', 'string'),
 ('HP', 'int'),
 ('Attack', 'int'),
 ('Defense', 'int'),
 ('Sp. Atk', 'int'),
 ('Sp. Def', 'int'),
 ('Speed', 'int'),
 ('Generation', 'int'),
 ('Legendary', 'boolean')]

### To get the Schema of the data frame

In [5]:
df_spark.printSchema()

root
 |-- #: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Type 1: string (nullable = true)
 |-- Type 2: string (nullable = true)
 |-- HP: string (nullable = true)
 |-- Attack: string (nullable = true)
 |-- Defense: string (nullable = true)
 |-- Sp. Atk: string (nullable = true)
 |-- Sp. Def: string (nullable = true)
 |-- Speed: string (nullable = true)
 |-- Generation: string (nullable = true)
 |-- Legendary: string (nullable = true)



### Conversion of pyspark Data Frame into pandas Data Frame

In [6]:
import pandas

In [7]:
df_spark.toPandas()

Unnamed: 0,#,Name,Type 1,Type 2,HP,Attack,Defense,Sp. Atk,Sp. Def,Speed,Generation,Legendary
0,1,Bulbasaur,Grass,Poison,45,49,49,65,65,45,1,FALSE
1,2,Ivysaur,Grass,Poison,60,62,63,80,80,60,1,FALSE
2,3,Venusaur,Grass,Poison,80,82,83,100,100,80,1,FALSE
3,3,VenusaurMega Venusaur,Grass,Poison,80,100,123,122,120,80,1,FALSE
4,4,Charmander,Fire,,39,52,43,60,50,65,1,FALSE
...,...,...,...,...,...,...,...,...,...,...,...,...
795,719,Diancie,Rock,Fairy,50,100,150,100,150,50,6,TRUE
796,719,DiancieMega Diancie,Rock,Fairy,50,160,110,160,110,110,6,TRUE
797,720,HoopaHoopa Confined,Psychic,Ghost,80,110,60,150,130,70,6,TRUE
798,720,HoopaHoopa Unbound,Psychic,Dark,80,160,60,170,130,80,6,TRUE


### Selecting and Accessing the data in the pyspark

In [10]:
# to get all Column Details
df_spark.columns

['#',
 'Name',
 'Type 1',
 'Type 2',
 'HP',
 'Attack',
 'Defense',
 'Sp. Atk',
 'Sp. Def',
 'Speed',
 'Generation',
 'Legendary']

In [11]:
df_spark.Name

Column<'Name'>

In [13]:
#Selecting the column and displaying them from data frame 
df_spark.select(df_spark.Name).show(10)
# we can access the same with alternative ways
# df_spark.select(df_spark["Name"]).show(10)
# df_spark.select("Name").show(10)


+--------------------+
|                Name|
+--------------------+
|           Bulbasaur|
|             Ivysaur|
|            Venusaur|
|VenusaurMega Venu...|
|          Charmander|
|          Charmeleon|
|           Charizard|
|CharizardMega Cha...|
|CharizardMega Cha...|
|            Squirtle|
+--------------------+
only showing top 10 rows



In [14]:
# to convert the data into uppercase
from pyspark.sql.functions import upper
# to add new column into the data frame
df_spark.withColumn('Name UpperCase',upper(df_spark.Name)).show()

+---+--------------------+------+------+---+------+-------+-------+-------+-----+----------+---------+--------------------+
|  #|                Name|Type 1|Type 2| HP|Attack|Defense|Sp. Atk|Sp. Def|Speed|Generation|Legendary|      Name UpperCase|
+---+--------------------+------+------+---+------+-------+-------+-------+-----+----------+---------+--------------------+
|  1|           Bulbasaur| Grass|Poison| 45|    49|     49|     65|     65|   45|         1|    false|           BULBASAUR|
|  2|             Ivysaur| Grass|Poison| 60|    62|     63|     80|     80|   60|         1|    false|             IVYSAUR|
|  3|            Venusaur| Grass|Poison| 80|    82|     83|    100|    100|   80|         1|    false|            VENUSAUR|
|  3|VenusaurMega Venu...| Grass|Poison| 80|   100|    123|    122|    120|   80|         1|    false|VENUSAURMEGA VENU...|
|  4|          Charmander|  Fire|  null| 39|    52|     43|     60|     50|   65|         1|    false|          CHARMANDER|
|  5|   

In [15]:
# to drop the column and get the rest of data via collect method
df_spark.drop('Name UpperCase').collect()

[Row(#=1, Name='Bulbasaur', Type 1='Grass', Type 2='Poison', HP=45, Attack=49, Defense=49, Sp. Atk=65, Sp. Def=65, Speed=45, Generation=1, Legendary=False),
 Row(#=2, Name='Ivysaur', Type 1='Grass', Type 2='Poison', HP=60, Attack=62, Defense=63, Sp. Atk=80, Sp. Def=80, Speed=60, Generation=1, Legendary=False),
 Row(#=3, Name='Venusaur', Type 1='Grass', Type 2='Poison', HP=80, Attack=82, Defense=83, Sp. Atk=100, Sp. Def=100, Speed=80, Generation=1, Legendary=False),
 Row(#=3, Name='VenusaurMega Venusaur', Type 1='Grass', Type 2='Poison', HP=80, Attack=100, Defense=123, Sp. Atk=122, Sp. Def=120, Speed=80, Generation=1, Legendary=False),
 Row(#=4, Name='Charmander', Type 1='Fire', Type 2=None, HP=39, Attack=52, Defense=43, Sp. Atk=60, Sp. Def=50, Speed=65, Generation=1, Legendary=False),
 Row(#=5, Name='Charmeleon', Type 1='Fire', Type 2=None, HP=58, Attack=64, Defense=58, Sp. Atk=80, Sp. Def=65, Speed=80, Generation=1, Legendary=False),
 Row(#=6, Name='Charizard', Type 1='Fire', Type 2='

### To Filter the pokemon data and get the pokemon which are legendary

In [13]:
df_spark.filter(df_spark.Legendary ==True).show()

+---+-------------------+--------+--------+---+------+-------+-------+-------+-----+----------+---------+
|  #|               Name|  Type 1|  Type 2| HP|Attack|Defense|Sp. Atk|Sp. Def|Speed|Generation|Legendary|
+---+-------------------+--------+--------+---+------+-------+-------+-------+-----+----------+---------+
|144|           Articuno|     Ice|  Flying| 90|    85|    100|     95|    125|   85|         1|     true|
|145|             Zapdos|Electric|  Flying| 90|    90|     85|    125|     90|  100|         1|     true|
|146|            Moltres|    Fire|  Flying| 90|   100|     90|    125|     85|   90|         1|     true|
|150|             Mewtwo| Psychic|    null|106|   110|     90|    154|     90|  130|         1|     true|
|150|MewtwoMega Mewtwo X| Psychic|Fighting|106|   190|    100|    154|    100|  130|         1|     true|
|150|MewtwoMega Mewtwo Y| Psychic|    null|106|   150|     70|    194|    120|  140|         1|     true|
|243|             Raikou|Electric|    null| 90

### Filter the pokemon which are legendary and have Attack power over 100

In [19]:
df_spark.filter((df_spark.Legendary==True) & (df_spark.Attack>100)).show()

+---+--------------------+-------+--------+---+------+-------+-------+-------+-----+----------+---------+
|  #|                Name| Type 1|  Type 2| HP|Attack|Defense|Sp. Atk|Sp. Def|Speed|Generation|Legendary|
+---+--------------------+-------+--------+---+------+-------+-------+-------+-----+----------+---------+
|150|              Mewtwo|Psychic|    null|106|   110|     90|    154|     90|  130|         1|     true|
|150| MewtwoMega Mewtwo X|Psychic|Fighting|106|   190|    100|    154|    100|  130|         1|     true|
|150| MewtwoMega Mewtwo Y|Psychic|    null|106|   150|     70|    194|    120|  140|         1|     true|
|244|               Entei|   Fire|    null|115|   115|     85|     90|     75|  100|         2|     true|
|250|               Ho-oh|   Fire|  Flying|106|   130|     90|    110|    154|   90|         2|     true|
|381|   LatiosMega Latios| Dragon| Psychic| 80|   130|    100|    160|    120|  110|         3|     true|
|382| KyogrePrimal Kyogre|  Water|    null|100

#### To use or condition in filter in data frame

In [20]:
df_spark.filter((df_spark.Defense>=80) | (df_spark.Defense<=100)).show()

+---+--------------------+------+------+---+------+-------+-------+-------+-----+----------+---------+
|  #|                Name|Type 1|Type 2| HP|Attack|Defense|Sp. Atk|Sp. Def|Speed|Generation|Legendary|
+---+--------------------+------+------+---+------+-------+-------+-------+-----+----------+---------+
|  1|           Bulbasaur| Grass|Poison| 45|    49|     49|     65|     65|   45|         1|    false|
|  2|             Ivysaur| Grass|Poison| 60|    62|     63|     80|     80|   60|         1|    false|
|  3|            Venusaur| Grass|Poison| 80|    82|     83|    100|    100|   80|         1|    false|
|  3|VenusaurMega Venu...| Grass|Poison| 80|   100|    123|    122|    120|   80|         1|    false|
|  4|          Charmander|  Fire|  null| 39|    52|     43|     60|     50|   65|         1|    false|
|  5|          Charmeleon|  Fire|  null| 58|    64|     58|     80|     65|   80|         1|    false|
|  6|           Charizard|  Fire|Flying| 78|    84|     78|    109|     8

#### to use not condition to filter and show the pokemon which are not legendary

In [22]:
df_spark.filter(~(df_spark.Legendary== True)).show()

+---+--------------------+------+------+---+------+-------+-------+-------+-----+----------+---------+
|  #|                Name|Type 1|Type 2| HP|Attack|Defense|Sp. Atk|Sp. Def|Speed|Generation|Legendary|
+---+--------------------+------+------+---+------+-------+-------+-------+-----+----------+---------+
|  1|           Bulbasaur| Grass|Poison| 45|    49|     49|     65|     65|   45|         1|    false|
|  2|             Ivysaur| Grass|Poison| 60|    62|     63|     80|     80|   60|         1|    false|
|  3|            Venusaur| Grass|Poison| 80|    82|     83|    100|    100|   80|         1|    false|
|  3|VenusaurMega Venu...| Grass|Poison| 80|   100|    123|    122|    120|   80|         1|    false|
|  4|          Charmander|  Fire|  null| 39|    52|     43|     60|     50|   65|         1|    false|
|  5|          Charmeleon|  Fire|  null| 58|    64|     58|     80|     65|   80|         1|    false|
|  6|           Charizard|  Fire|Flying| 78|    84|     78|    109|     8

### Sorting the pokemon data with high attack in descending order

In [16]:
df_spark.sort("Attack",ascending=False).show(truncate=False)
# df_spark.sort(df_spark.Attack.desc)

+---+-----------------------+-------+--------+---+------+-------+-------+-------+-----+----------+---------+
|#  |Name                   |Type 1 |Type 2  |HP |Attack|Defense|Sp. Atk|Sp. Def|Speed|Generation|Legendary|
+---+-----------------------+-------+--------+---+------+-------+-------+-------+-----+----------+---------+
|150|MewtwoMega Mewtwo X    |Psychic|Fighting|106|190   |100    |154    |100    |130  |1         |true     |
|214|HeracrossMega Heracross|Bug    |Fighting|80 |185   |115    |40     |105    |75   |2         |false    |
|383|GroudonPrimal Groudon  |Ground |Fire    |100|180   |160    |150    |90     |90   |3         |true     |
|384|RayquazaMega Rayquaza  |Dragon |Flying  |105|180   |100    |180    |100    |115  |3         |true     |
|386|DeoxysAttack Forme     |Psychic|null    |50 |180   |20     |180    |20     |150  |3         |true     |
|646|KyuremBlack Kyurem     |Dragon |Ice     |125|170   |100    |120    |90     |95   |5         |true     |
|445|GarchompMega G

### To drop the rows which have atleast one null value in them

In [None]:
df_spark.na.drop().show()

#### To drop the rows which have all values in a row is null
```
df_spark.na.drop(how="all").show()
```

#### To drop the rows that have more than two null values in column
```
df_spark.na.drop(how="any",thresh=2).show()
```

#### To drop the rows that have null in particular column for example in Type 2 column of pokemon dataset
```
df_spark.na.drop(how="any",subset=['Type 2']).show()
```

### To fill the null values in Column `Type 2` with value 'Pokemon'

In [17]:
df_spark.na.fill('pokemon',['Type 2']).show()

+---+--------------------+------+-------+---+------+-------+-------+-------+-----+----------+---------+
|  #|                Name|Type 1| Type 2| HP|Attack|Defense|Sp. Atk|Sp. Def|Speed|Generation|Legendary|
+---+--------------------+------+-------+---+------+-------+-------+-------+-----+----------+---------+
|  1|           Bulbasaur| Grass| Poison| 45|    49|     49|     65|     65|   45|         1|    false|
|  2|             Ivysaur| Grass| Poison| 60|    62|     63|     80|     80|   60|         1|    false|
|  3|            Venusaur| Grass| Poison| 80|    82|     83|    100|    100|   80|         1|    false|
|  3|VenusaurMega Venu...| Grass| Poison| 80|   100|    123|    122|    120|   80|         1|    false|
|  4|          Charmander|  Fire|pokemon| 39|    52|     43|     60|     50|   65|         1|    false|
|  5|          Charmeleon|  Fire|pokemon| 58|    64|     58|     80|     65|   80|         1|    false|
|  6|           Charizard|  Fire| Flying| 78|    84|     78|    

### Order By in pyspark Data Frame

In [11]:
df_spark.orderBy(df_spark.Attack.desc(),df_spark.Defense.asc()).show(truncate=False)

+---+---------------------+--------+------+---+------+-------+-------+-------+-----+----------+---------+
|#  |Name                 |Type 1  |Type 2|HP |Attack|Defense|Sp. Atk|Sp. Def|Speed|Generation|Legendary|
+---+---------------------+--------+------+---+------+-------+-------+-------+-----+----------+---------+
|516|Simipour             |Water   |null  |75 |98    |63     |98     |63     |101  |5         |FALSE    |
|514|Simisear             |Fire    |null  |75 |98    |63     |98     |63     |101  |5         |FALSE    |
|512|Simisage             |Grass   |null  |75 |98    |63     |98     |63     |101  |5         |FALSE    |
|631|Heatmor              |Fire    |null  |85 |97    |66     |105    |66     |65   |5         |FALSE    |
|372|Shelgon              |Dragon  |null  |65 |95    |100    |60     |50     |50   |3         |FALSE    |
|181|AmpharosMega Ampharos|Electric|Dragon|90 |95    |105    |165    |110    |45   |2         |FALSE    |
|75 |Graveler             |Rock    |Ground|55 

### Group by the Type of Pokemon and get the count of each pokemon type

In [6]:
df_spark.groupBy(df_spark["Type 1"]).count().collect()

[Row(Type 1='Water', count=112),
 Row(Type 1='Poison', count=28),
 Row(Type 1='Steel', count=27),
 Row(Type 1='Rock', count=44),
 Row(Type 1='Ice', count=24),
 Row(Type 1='Ghost', count=32),
 Row(Type 1='Fairy', count=17),
 Row(Type 1='Psychic', count=57),
 Row(Type 1='Dragon', count=32),
 Row(Type 1='Flying', count=4),
 Row(Type 1='Bug', count=69),
 Row(Type 1='Electric', count=44),
 Row(Type 1='Fire', count=52),
 Row(Type 1='Ground', count=32),
 Row(Type 1='Dark', count=31),
 Row(Type 1='Fighting', count=27),
 Row(Type 1='Grass', count=70),
 Row(Type 1='Normal', count=98)]

### Group by to get the max Attack value of every type of pokemon

In [10]:
df_spark.groupBy("Type 1").max("Attack").show()

+--------+-----------+
|  Type 1|max(Attack)|
+--------+-----------+
|   Water|        155|
|  Poison|        106|
|   Steel|        150|
|    Rock|        165|
|     Ice|        130|
|   Ghost|        165|
|   Fairy|        131|
| Psychic|        190|
|  Dragon|        180|
|  Flying|        115|
|     Bug|        185|
|Electric|        123|
|    Fire|        160|
|  Ground|        180|
|    Dark|        150|
|Fighting|        145|
|   Grass|        132|
|  Normal|        160|
+--------+-----------+



#### To get the average defense of each pokemon type

In [23]:
df_spark.groupBy("Type 1").avg("Defense").show()

+--------+------------------+
|  Type 1|      avg(Defense)|
+--------+------------------+
|   Water| 72.94642857142857|
|  Poison| 68.82142857142857|
|   Steel|126.37037037037037|
|    Rock|100.79545454545455|
|     Ice| 71.41666666666667|
|   Ghost|           81.1875|
|   Fairy| 65.70588235294117|
| Psychic|  67.6842105263158|
|  Dragon|            86.375|
|  Flying|             66.25|
|     Bug| 70.72463768115942|
|Electric| 66.29545454545455|
|    Fire| 67.76923076923077|
|  Ground|          84.84375|
|    Dark|  70.2258064516129|
|Fighting| 65.92592592592592|
|   Grass|              70.8|
|  Normal|  59.8469387755102|
+--------+------------------+



### Joins in pyspark

In [9]:
# we are importing new dataset which contains customer details for performing joins
df_customer = spark.read.csv('Customer_Details.csv',header=True,inferSchema=True)
df_customer

CustomerId,CustomerName,District
1,Sakthi,Tenkasi
2,Eyuwankg,Thoothukudi
3,Balaji,Chennai
4,Sriram,Tirunelveli
5,Kaviarasan,Tirunelveli
6,prathip,Chennai


In [7]:
# we are importing  new dataset which contains car order details 
df_car_order = spark.read.csv('CarOrderDetails.csv',header=True,inferSchema=True)
df_car_order

OrderId,Car Name,OrderDate,CustomerId
1,BMW iX,8/24/2022,3
2,Rolls Royce Ghost,8/25/2022,1
3,Audi RS7,8/26/2022,5
4,Lamborghini Huracan,8/27/2022,2
5,Mustang,8/28/2022,6
6,Ferrari,8/29/2022,4
7,Benz,8/30/2022,3
8,Jaguar,8/31/2022,5
9,Tesla Model x,9/1/2022,2
10,Buggati Chiron,9/2/2022,2


#### Perform Inner join both the data frames and get the Customer Name and Car Name

In [11]:
df_car_order.join(df_customer,df_customer.CustomerId == df_car_order.CustomerId,'inner').select(df_customer.CustomerName,df_car_order['Car Name'])

CustomerName,Car Name
Balaji,BMW iX
Sakthi,Rolls Royce Ghost
Kaviarasan,Audi RS7
Eyuwankg,Lamborghini Huracan
prathip,Mustang
Sriram,Ferrari
Balaji,Benz
Kaviarasan,Jaguar
Eyuwankg,Tesla Model x
Eyuwankg,Buggati Chiron


#### To get the distinct customer name who ordered a car

In [22]:
df_car_order.join(df_customer,df_customer.CustomerId == df_car_order.CustomerId,'inner').select(df_customer.CustomerName).distinct()

CustomerName
Eyuwankg
prathip
Kaviarasan
Sriram
Balaji
Sakthi


#### use Left Join on df_customer on df_car_order

In [14]:
df_customer.join(df_car_order,df_customer.CustomerId == df_car_order.CustomerId,'left').select(df_customer.CustomerId,df_customer.CustomerName,df_customer.District,df_car_order['Car Name'])

CustomerId,CustomerName,District,Car Name
1,Sakthi,Tenkasi,Rolls Royce Ghost
2,Eyuwankg,Thoothukudi,Buggati Chiron
2,Eyuwankg,Thoothukudi,Tesla Model x
2,Eyuwankg,Thoothukudi,Lamborghini Huracan
3,Balaji,Chennai,Benz
3,Balaji,Chennai,BMW iX
4,Sriram,Tirunelveli,Ferrari
5,Kaviarasan,Tirunelveli,Jaguar
5,Kaviarasan,Tirunelveli,Audi RS7
6,prathip,Chennai,Mustang


#### Right outer join on car order details with customer name

In [19]:
df_customer.join(df_car_order,df_customer.CustomerId == df_car_order.CustomerId,'right_outer').select(df_car_order.OrderId,df_customer.CustomerName,df_car_order['Car Name'])

OrderId,CustomerName,Car Name
1,Balaji,BMW iX
2,Sakthi,Rolls Royce Ghost
3,Kaviarasan,Audi RS7
4,Eyuwankg,Lamborghini Huracan
5,prathip,Mustang
6,Sriram,Ferrari
7,Balaji,Benz
8,Kaviarasan,Jaguar
9,Eyuwankg,Tesla Model x
10,Eyuwankg,Buggati Chiron


#### Other Joins in pyspark data frame

#### default `inner`. Must be one of: `inner`, `cross`, `outer`, `full`, `fullouter`, `full_outer`, `left`, `leftouter`, `left_outer`, `right`, `rightouter`, `right_outer`, `semi`, `leftsemi`, `left_semi`, `anti`, `leftanti` and `left_anti`.

### List comprehension

#### To Use list comprehension to fetch the customer name and district


In [27]:
customerColumnList =['CustomerName','District'] 

In [24]:
df_customer.select([columnName for columnName in customerColumnList])

CustomerName,District
Sakthi,Tenkasi
Eyuwankg,Thoothukudi
Balaji,Chennai
Sriram,Tirunelveli
Kaviarasan,Tirunelveli
prathip,Chennai


#### using condition in list comprehension to select only CustomerName column from list

In [29]:
df_customer.select([columnName for columnName in customerColumnList if columnName=='CustomerName'])

CustomerName
Sakthi
Eyuwankg
Balaji
Sriram
Kaviarasan
prathip


#### using list comprehension to iterate through the data of customer and print them 

In [31]:
[print(row["CustomerId"],row["CustomerName"],row["District"]) for row in df_customer.collect()]

1 Sakthi Tenkasi
2 Eyuwankg Thoothukudi
3 Balaji Chennai
4 Sriram Tirunelveli
5 Kaviarasan Tirunelveli
6 prathip Chennai


[None, None, None, None, None, None]