## Working with Different Types of Data


### Step 1: Initialize PySpark Session


In [28]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("day3").getOrCreate()


### Step 2: Load the Dataset


In [29]:
# Load the Chipotle dataset into a Spark DataFrame
data_path = "../data/titanic.csv"  # Replace with the actual path
titanic_df = spark.read.csv(data_path, header=True, inferSchema=True)

# Load the Chipotle dataset into a Spark DataFrame
data_path = '../data/chipotle.csv' # Replace with the actual path
chipotle_df = spark.read.csv(data_path, header=True, inferSchema=True)

# Load the Chipotle dataset into a Spark DataFrame
data_path = '../data/kalimati_tarkari_dataset.csv' # Replace with the actual path
kalimati_df = spark.read.csv(data_path, header=True, inferSchema=True)


In [30]:
print(titanic_df.printSchema(),chipotle_df.printSchema(),kalimati_df.printSchema())

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

root
 |-- _c0: integer (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- item_name: string (nullable = true)
 |-- choice_description: string (nullable = true)
 |-- item_price: string (nullable = true)

root
 |-- SN: integer (nullable = true)
 |-- Commodity: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Unit: string (nullable = true)
 |-- Minimum: double (nullable = true)
 |-- Maximum: double (nullable = true)
 |-- Average: double (nullable = true)

None None N

### Converting to Spark Types:

Question: Load the "titanic" dataset and convert the "Fare" column from double to integer.




In [58]:
from pyspark.sql.functions import col,when,avg,lit,instr,regexp_extract,sum,coalesce,struct,split,explode,create_map

In [32]:
# .cast is used to typecast  the column into integer
titanic_df = titanic_df.withColumn("Fare",col("Fare").cast("int"))
titanic_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: integer (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



### Working with Booleans:

Question: Load the "titanic" dataset and add a new column "IsAdult" that indicates whether a passenger is an adult (age >= 18) or not.

In [33]:
titanic_df_m = titanic_df.withColumn("isAdult",when(col("Age")>=18,True).otherwise(False))
titanic_df_m.select("PassengerId","Name","Age","isAdult").show()

+-----------+--------------------+----+-------+
|PassengerId|                Name| Age|isAdult|
+-----------+--------------------+----+-------+
|          1|Braund, Mr. Owen ...|22.0|   true|
|          2|Cumings, Mrs. Joh...|38.0|   true|
|          3|Heikkinen, Miss. ...|26.0|   true|
|          4|Futrelle, Mrs. Ja...|35.0|   true|
|          5|Allen, Mr. Willia...|35.0|   true|
|          6|    Moran, Mr. James|null|  false|
|          7|McCarthy, Mr. Tim...|54.0|   true|
|          8|Palsson, Master. ...| 2.0|  false|
|          9|Johnson, Mrs. Osc...|27.0|   true|
|         10|Nasser, Mrs. Nich...|14.0|  false|
|         11|Sandstrom, Miss. ...| 4.0|  false|
|         12|Bonnell, Miss. El...|58.0|   true|
|         13|Saundercock, Mr. ...|20.0|   true|
|         14|Andersson, Mr. An...|39.0|   true|
|         15|Vestrom, Miss. Hu...|14.0|  false|
|         16|Hewlett, Mrs. (Ma...|55.0|   true|
|         17|Rice, Master. Eugene| 2.0|  false|
|         18|Williams, Mr. Cha...|null| 

### Working with Numbers:

Question: Load the "titanic" dataset and calculate the average age of male and female passengers separately.

In [34]:
#simple grouBy is used to solve this problem alingwith aliasing.
titanic_avg = titanic_df.groupBy("Sex").agg(avg(col("Age")).alias("AvgAge"))
titanic_avg.show()

+------+------------------+
|   Sex|            AvgAge|
+------+------------------+
|female|27.915708812260537|
|  male| 30.72664459161148|
+------+------------------+



### Working with Strings:

Question: Load the "chipotle" dataset and find the item names containing the word "Chicken."

In [35]:
#here, instr searches whether the string we provided is included in the column or not 
#returns the number of occurences ofthe string in each data, h
#we need a single occurence so we providethe condtion of one or more occurence in thewhere clause.


chipotle_df_chicken = chipotle_df.select('*').where(instr(col('item_name'),"Chicken")>=1)
chipotle_df_chicken.show()

+---+--------+--------+--------------------+--------------------+----------+
|_c0|order_id|quantity|           item_name|  choice_description|item_price|
+---+--------+--------+--------------------+--------------------+----------+
|  4|       2|       2|        Chicken Bowl|[Tomatillo-Red Ch...|   $16.98 |
|  5|       3|       1|        Chicken Bowl|[Fresh Tomato Sal...|   $10.98 |
| 11|       6|       1|Chicken Crispy Tacos|[Roasted Chili Co...|    $8.75 |
| 12|       6|       1|  Chicken Soft Tacos|[Roasted Chili Co...|    $8.75 |
| 13|       7|       1|        Chicken Bowl|[Fresh Tomato Sal...|   $11.25 |
| 16|       8|       1|     Chicken Burrito|[Tomatillo-Green ...|    $8.49 |
| 17|       9|       1|     Chicken Burrito|[Fresh Tomato Sal...|    $8.49 |
| 19|      10|       1|        Chicken Bowl|[Tomatillo Red Ch...|    $8.75 |
| 23|      12|       1|     Chicken Burrito|[[Tomatillo-Green...|   $10.98 |
| 26|      13|       1|        Chicken Bowl|[Roasted Chili Co...|    $8.49 |

23/08/31 18:50:24 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , order_id, quantity, item_name, choice_description, item_price
 Schema: _c0, order_id, quantity, item_name, choice_description, item_price
Expected: _c0 but found: 
CSV file: file:///home/ubuntu/Desktop/sparkdemo/data/chipotle.csv


### Regular Expressions:

Question: Load the "chipotle" dataset and find the items with names that start with "Ch" followed by any character.



In [36]:
#the regex pattern variable storesthe pattern that is to be checked. 
#regexp_extract has been used to chekc the regex pattern 
# the argument 0 is given to mention the group in regex pattern that we are capturing the groupin the 0th index
#since there is onlty one group here , it capturesthe same group

regex_pattern ='^Ch.'
chipotle_ch = chipotle_df.filter(
        regexp_extract(col("item_name"),regex_pattern,0) !=''
)
chipotle_ch.show()

+---+--------+--------+--------------------+--------------------+----------+
|_c0|order_id|quantity|           item_name|  choice_description|item_price|
+---+--------+--------+--------------------+--------------------+----------+
|  0|       1|       1|Chips and Fresh T...|                null|    $2.39 |
|  3|       1|       1|Chips and Tomatil...|                null|    $2.39 |
|  4|       2|       2|        Chicken Bowl|[Tomatillo-Red Ch...|   $16.98 |
|  5|       3|       1|        Chicken Bowl|[Fresh Tomato Sal...|   $10.98 |
| 10|       5|       1| Chips and Guacamole|                null|    $4.45 |
| 11|       6|       1|Chicken Crispy Tacos|[Roasted Chili Co...|    $8.75 |
| 12|       6|       1|  Chicken Soft Tacos|[Roasted Chili Co...|    $8.75 |
| 13|       7|       1|        Chicken Bowl|[Fresh Tomato Sal...|   $11.25 |
| 14|       7|       1| Chips and Guacamole|                null|    $4.45 |
| 15|       8|       1|Chips and Tomatil...|                null|    $2.39 |

23/08/31 18:50:24 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , order_id, quantity, item_name, choice_description, item_price
 Schema: _c0, order_id, quantity, item_name, choice_description, item_price
Expected: _c0 but found: 
CSV file: file:///home/ubuntu/Desktop/sparkdemo/data/chipotle.csv


### Working with Nulls in Data:

Question: Load the "titanic" dataset and count the number of passengers with missing age information.



In [37]:
#here, isnull is used to count the  null values whcih is casted into integers and the sum is calculated
#the .collect() method has been used to get the rows from the table and indexed to find exactly the number rather 
#than the table itself


titanic_null = titanic_df.select(sum(col("Age").isNull().cast("int")).alias("missing_sum_age"))
print("Number of Passengers with missing Age: ", titanic_null.collect()[0][0])

Number of Passengers with missing Age:  177


### Coalesce
Question: Utilizing the Chipotle dataset, use the coalesce function to combine the "item_name" and "choice_description" columns into a new column named "OrderDetails." Display the first 5 rows of the resulting DataFrame.


In [38]:
#here, the coalesce returns the first non-null value from the given columns
#in our case , it returns the nonnull value from the item_name and places it in orderdetails
#if null occours in item_name then it returns data from choice_description 
# if both null, then it places null valuein that place.


chipotle_coal = chipotle_df.withColumn('OrderDetails', coalesce(col("item_name"), col("choice_description")))
chipotle_coal.show(5)

+---+--------+--------+--------------------+--------------------+----------+--------------------+
|_c0|order_id|quantity|           item_name|  choice_description|item_price|        OrderDetails|
+---+--------+--------+--------------------+--------------------+----------+--------------------+
|  0|       1|       1|Chips and Fresh T...|                null|    $2.39 |Chips and Fresh T...|
|  1|       1|       1|                Izze|        [Clementine]|    $3.39 |                Izze|
|  2|       1|       1|    Nantucket Nectar|             [Apple]|    $3.39 |    Nantucket Nectar|
|  3|       1|       1|Chips and Tomatil...|                null|    $2.39 |Chips and Tomatil...|
|  4|       2|       2|        Chicken Bowl|[Tomatillo-Red Ch...|   $16.98 |        Chicken Bowl|
+---+--------+--------+--------------------+--------------------+----------+--------------------+
only showing top 5 rows



23/08/31 18:50:24 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , order_id, quantity, item_name, choice_description, item_price
 Schema: _c0, order_id, quantity, item_name, choice_description, item_price
Expected: _c0 but found: 
CSV file: file:///home/ubuntu/Desktop/sparkdemo/data/chipotle.csv


### ifnull, nullIf, nvl, and nvl2

Question: Replace the null values in the "Age" column of the Titanic dataset with the average age.

In [39]:
#here , , the average_age variable isasigned iwth the average value iwht collect() method
#when condition is used. 
# icouldnt find api for the above topics in pyspark , however they could be used with sparksql.


average_age = titanic_df.select(avg(col("Age"))).collect()[0][0]
titanic_df_avg_age = titanic_df.withColumn('Age',when(col('Age').isNull(), average_age).otherwise(col("Age")))
titanic_df_avg_age.show()

+-----------+--------+------+--------------------+------+-----------------+-----+-----+----------------+----+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|              Age|SibSp|Parch|          Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+-----------------+-----+-----+----------------+----+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|             22.0|    1|    0|       A/5 21171|   7| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|             38.0|    1|    0|        PC 17599|  71|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|             26.0|    0|    0|STON/O2. 3101282|   7| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|             35.0|    1|    0|          113803|  53| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|             35.0|    0|    0|          373450|   8| null|       S|


In [40]:
#here we use sql to fin theaverage age using the ifnull fucniton and replacethe columns 


titanic_df.createOrReplaceTempView("titanic_view")
query = """    
        with cte as (
            select Age
            from 
                titanic_view
        )
        select 
            ifnull(null, avg(Age))
        from 
            cte
"""

titanic_sql = spark.sql(query)
avg_age= titanic_sql.collect()[0][0]

titanic_df_avg_age = titanic_df.withColumn("Age",when(col('Age').isNull(), avg_age).otherwise(col("Age")))
titanic_df_avg_age.show()

+-----------+--------+------+--------------------+------+-----------------+-----+-----+----------------+----+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|              Age|SibSp|Parch|          Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+-----------------+-----+-----+----------------+----+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|             22.0|    1|    0|       A/5 21171|   7| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|             38.0|    1|    0|        PC 17599|  71|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|             26.0|    0|    0|STON/O2. 3101282|   7| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|             35.0|    1|    0|          113803|  53| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|             35.0|    0|    0|          373450|   8| null|       S|


### drop

Question: Remove the "Cabin" column from the Titanic dataset.


In [41]:
#we use the .drop method whichc takes column as an rgument to drop it
titanic_cabin_removed = titanic_df.drop("Cabin")
titanic_cabin_removed.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|Fare|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|  71|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|   7|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|  53|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|          330877|   8|       Q|
|          7|       0|     1|McCarthy, Mr. Tim

### fill

Question: Fill the null values in the "Age" column of the Titanic dataset with a default age of 30.

In [43]:
titanic_df_filled = titanic_df.na.fill(30,subset=['Age'])
titanic_df_filled.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+----+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+----+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|  71|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|   7| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|  53| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|30.0|    0|    0|          330877|   8| null|  

###  replace

Question: Replace the gender "male" with "M" and "female" with "F" in the "Sex" column of the Titanic dataset.

In [46]:
#here we create a replacement dict to specify thevaluesto replace and what to replace  
#.repace is used wich takes the values to be replaced and values to replace argement here as a dictionary 
#and thecolumn specifies is the "sex" column 

replacement = {
    "male": "M",
    "female": "F"
}

titanic_df_m_f = titanic_df.replace(replacement,subset="Sex")
titanic_df_m_f.show()

+-----------+--------+------+--------------------+---+----+-----+-----+----------------+----+-----+--------+
|PassengerId|Survived|Pclass|                Name|Sex| Age|SibSp|Parch|          Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+---+----+-----+-----+----------------+----+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  M|22.0|    1|    0|       A/5 21171|   7| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|  F|38.0|    1|    0|        PC 17599|  71|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|  F|26.0|    0|    0|STON/O2. 3101282|   7| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|  F|35.0|    1|    0|          113803|  53| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  M|35.0|    0|    0|          373450|   8| null|       S|
|          6|       0|     3|    Moran, Mr. James|  M|null|    0|    0|          330877|   8| null|       Q|
|          7|      

### 6. Working with Complex Types: Structs

Question: Create a new DataFrame from the Kalimati Tarkari dataset, including a new column "PriceRange" that is a struct containing "Minimum" and "Maximum" prices for each commodity.

In [49]:
#structs can basically be undrestood as dataframes inside dataframes 
#we have used struct method from the pyspark.sql.fucntions module in order to solve the problem mentionedabove.
#as you can see inthe output there is a dataframe inside each row of the pricerange column

kalimati_df_pricerange = kalimati_df.withColumn("PriceRange", struct(col("Minimum"),col('Maximum')))

kalimati_df_pricerange.show()

+---+--------------------+----------+----+-------+-------+-------+------------+
| SN|           Commodity|      Date|Unit|Minimum|Maximum|Average|  PriceRange|
+---+--------------------+----------+----+-------+-------+-------+------------+
|  0|  Tomato Big(Nepali)|2013-06-16|  Kg|   35.0|   40.0|   37.5|{35.0, 40.0}|
|  1| Tomato Small(Local)|2013-06-16|  Kg|   26.0|   32.0|   29.0|{26.0, 32.0}|
|  2|          Potato Red|2013-06-16|  Kg|   20.0|   21.0|   20.5|{20.0, 21.0}|
|  3|        Potato White|2013-06-16|  Kg|   15.0|   16.0|   15.5|{15.0, 16.0}|
|  4|  Onion Dry (Indian)|2013-06-16|  Kg|   28.0|   30.0|   29.0|{28.0, 30.0}|
|  5|       Carrot(Local)|2013-06-16|  Kg|   30.0|   35.0|   32.5|{30.0, 35.0}|
|  6|      Cabbage(Local)|2013-06-16|  Kg|    6.0|   10.0|    8.0| {6.0, 10.0}|
|  7|         Cauli Local|2013-06-16|  Kg|   30.0|   35.0|   32.5|{30.0, 35.0}|
|  8|         Raddish Red|2013-06-16|  Kg|   35.0|   40.0|   37.5|{35.0, 40.0}|
|  9|Raddish White(Local)|2013-06-16|  K

### Working with Complex Types: Arrays
Question: Create a new DataFrame from the Kalimati Tarkari dataset, including a new column "CommodityList" that is an array of all the commodities.


In [53]:
#we use split fucntion from the opyspark library as shown below to perform this


kalimati_df_split = kalimati_df.withColumn('CommodityList',split(col("Commodity")," "))

kalimati_df_split.show()

+---+--------------------+----------+----+-------+-------+-------+--------------------+
| SN|           Commodity|      Date|Unit|Minimum|Maximum|Average|       CommodityList|
+---+--------------------+----------+----+-------+-------+-------+--------------------+
|  0|  Tomato Big(Nepali)|2013-06-16|  Kg|   35.0|   40.0|   37.5|[Tomato, Big(Nepa...|
|  1| Tomato Small(Local)|2013-06-16|  Kg|   26.0|   32.0|   29.0|[Tomato, Small(Lo...|
|  2|          Potato Red|2013-06-16|  Kg|   20.0|   21.0|   20.5|       [Potato, Red]|
|  3|        Potato White|2013-06-16|  Kg|   15.0|   16.0|   15.5|     [Potato, White]|
|  4|  Onion Dry (Indian)|2013-06-16|  Kg|   28.0|   30.0|   29.0|[Onion, Dry, (Ind...|
|  5|       Carrot(Local)|2013-06-16|  Kg|   30.0|   35.0|   32.5|     [Carrot(Local)]|
|  6|      Cabbage(Local)|2013-06-16|  Kg|    6.0|   10.0|    8.0|    [Cabbage(Local)]|
|  7|         Cauli Local|2013-06-16|  Kg|   30.0|   35.0|   32.5|      [Cauli, Local]|
|  8|         Raddish Red|2013-0

In [55]:
#or it can be done usingthe selectstatement as well 


kalimati_df_split_2 = kalimati_df.select('*',split(col("Commodity")," ").alias("CommodityList"))
kalimati_df_split_2.show()

+---+--------------------+----------+----+-------+-------+-------+--------------------+
| SN|           Commodity|      Date|Unit|Minimum|Maximum|Average|       CommodityList|
+---+--------------------+----------+----+-------+-------+-------+--------------------+
|  0|  Tomato Big(Nepali)|2013-06-16|  Kg|   35.0|   40.0|   37.5|[Tomato, Big(Nepa...|
|  1| Tomato Small(Local)|2013-06-16|  Kg|   26.0|   32.0|   29.0|[Tomato, Small(Lo...|
|  2|          Potato Red|2013-06-16|  Kg|   20.0|   21.0|   20.5|       [Potato, Red]|
|  3|        Potato White|2013-06-16|  Kg|   15.0|   16.0|   15.5|     [Potato, White]|
|  4|  Onion Dry (Indian)|2013-06-16|  Kg|   28.0|   30.0|   29.0|[Onion, Dry, (Ind...|
|  5|       Carrot(Local)|2013-06-16|  Kg|   30.0|   35.0|   32.5|     [Carrot(Local)]|
|  6|      Cabbage(Local)|2013-06-16|  Kg|    6.0|   10.0|    8.0|    [Cabbage(Local)]|
|  7|         Cauli Local|2013-06-16|  Kg|   30.0|   35.0|   32.5|      [Cauli, Local]|
|  8|         Raddish Red|2013-0

### Working with Complex Types: explode

Question: Explode the "CommodityList" array column from the previous step to generate a new row for each commodity in the list.

In [56]:
kalimati_df_split = kalimati_df_split.withColumn("Exploded",explode(col("CommodityList")))
kalimati_df_split.show()

+---+--------------------+----------+----+-------+-------+-------+--------------------+--------------+
| SN|           Commodity|      Date|Unit|Minimum|Maximum|Average|       CommodityList|      Exploded|
+---+--------------------+----------+----+-------+-------+-------+--------------------+--------------+
|  0|  Tomato Big(Nepali)|2013-06-16|  Kg|   35.0|   40.0|   37.5|[Tomato, Big(Nepa...|        Tomato|
|  0|  Tomato Big(Nepali)|2013-06-16|  Kg|   35.0|   40.0|   37.5|[Tomato, Big(Nepa...|   Big(Nepali)|
|  1| Tomato Small(Local)|2013-06-16|  Kg|   26.0|   32.0|   29.0|[Tomato, Small(Lo...|        Tomato|
|  1| Tomato Small(Local)|2013-06-16|  Kg|   26.0|   32.0|   29.0|[Tomato, Small(Lo...|  Small(Local)|
|  2|          Potato Red|2013-06-16|  Kg|   20.0|   21.0|   20.5|       [Potato, Red]|        Potato|
|  2|          Potato Red|2013-06-16|  Kg|   20.0|   21.0|   20.5|       [Potato, Red]|           Red|
|  3|        Potato White|2013-06-16|  Kg|   15.0|   16.0|   15.5|     [P

### Working with Complex Types: Maps

Question: Create a new DataFrame from the Kalimati Tarkari dataset, including a new column "PriceMap" that is a map with "Commodity" as the key and "Average" price as the value.
Answer:

In [67]:


price_map = create_map(col('Commodity'),col('Average'))
kalimati_df_map= kalimati_df.withColumn("PriceMap", price_map)
kalimati_df_map.show()


Column<'map(Commodity, Average)[key]'>
+---+--------------------+----------+----+-------+-------+-------+--------------------+
| SN|           Commodity|      Date|Unit|Minimum|Maximum|Average|            PriceMap|
+---+--------------------+----------+----+-------+-------+-------+--------------------+
|  0|  Tomato Big(Nepali)|2013-06-16|  Kg|   35.0|   40.0|   37.5|{Tomato Big(Nepal...|
|  1| Tomato Small(Local)|2013-06-16|  Kg|   26.0|   32.0|   29.0|{Tomato Small(Loc...|
|  2|          Potato Red|2013-06-16|  Kg|   20.0|   21.0|   20.5|{Potato Red -> 20.5}|
|  3|        Potato White|2013-06-16|  Kg|   15.0|   16.0|   15.5|{Potato White -> ...|
|  4|  Onion Dry (Indian)|2013-06-16|  Kg|   28.0|   30.0|   29.0|{Onion Dry (India...|
|  5|       Carrot(Local)|2013-06-16|  Kg|   30.0|   35.0|   32.5|{Carrot(Local) ->...|
|  6|      Cabbage(Local)|2013-06-16|  Kg|    6.0|   10.0|    8.0|{Cabbage(Local) -...|
|  7|         Cauli Local|2013-06-16|  Kg|   30.0|   35.0|   32.5|{Cauli Local ->

### Working with JSON

Question: Convert the "kalimati_df" DataFrame to JSON format and write it to a JSON file.

In [70]:
kalimati_json = kalimati_df.toJSON().collect()
filename = "Kalimati.json"

with open(filename, "w") as f:
    for json_row in kalimati_json:
        f.write(json_row + "\n")

print("Data written to", filename)

23/09/01 08:16:51 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:641)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1111)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:244)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
	at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46)
	at java.base/j

Data written to Kalimati.json


23/09/01 08:17:00 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:611)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$1(BlockManagerMasterEndpoint.scala:610)
	at org.apache.spar

In [72]:
kalimati_json_df = spark.read.json(spark.sparkContext.parallelize(kalimati_json))
kalimati_json_df.show()

23/09/01 08:19:24 WARN TaskSetManager: Stage 91 contains a task of very large size (2849 KiB). The maximum recommended task size is 1000 KiB.
23/09/01 08:19:26 WARN TaskSetManager: Stage 92 contains a task of very large size (2849 KiB). The maximum recommended task size is 1000 KiB.


+-------+--------------------+----------+-------+-------+---+----+
|Average|           Commodity|      Date|Maximum|Minimum| SN|Unit|
+-------+--------------------+----------+-------+-------+---+----+
|   37.5|  Tomato Big(Nepali)|2013-06-16|   40.0|   35.0|  0|  Kg|
|   29.0| Tomato Small(Local)|2013-06-16|   32.0|   26.0|  1|  Kg|
|   20.5|          Potato Red|2013-06-16|   21.0|   20.0|  2|  Kg|
|   15.5|        Potato White|2013-06-16|   16.0|   15.0|  3|  Kg|
|   29.0|  Onion Dry (Indian)|2013-06-16|   30.0|   28.0|  4|  Kg|
|   32.5|       Carrot(Local)|2013-06-16|   35.0|   30.0|  5|  Kg|
|    8.0|      Cabbage(Local)|2013-06-16|   10.0|    6.0|  6|  Kg|
|   32.5|         Cauli Local|2013-06-16|   35.0|   30.0|  7|  Kg|
|   37.5|         Raddish Red|2013-06-16|   40.0|   35.0|  8|  Kg|
|   27.5|Raddish White(Local)|2013-06-16|   30.0|   25.0|  9|  Kg|
|   17.0|        Brinjal Long|2013-06-16|   18.0|   16.0| 10|  Kg|
|   21.0|       Brinjal Round|2013-06-16|   22.0|   20.0| 11| 

23/09/01 08:19:30 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:611)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$1(BlockManagerMasterEndpoint.scala:610)
	at org.apache.spar