<a href="https://colab.research.google.com/github/jugalpanchal/bd-chef/blob/main/spark_files_io.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
# Follow the steps to install the dependencies:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # install java
!wget -q https://downloads.apache.org/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz # spark package download
!tar xf spark-3.5.4-bin-hadoop3.tgz # unzip spark package
!pip install -q findspark # install spark

# Set the location of Java and Spark:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.4-bin-hadoop3"

import findspark
findspark.init()

In [7]:
from pyspark.sql import SparkSession

# create or get spark session
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("Spark_App1") \
        .getOrCreate()

sc = spark.sparkContext

If the csv file has a header (column names in the first row) then set header=true. This will use the first row in the csv file as the dataframe's column names. Setting header=false (default option) will result in a dataframe with default column names: _c0, _c1, _c2, etc.
If the csv file has a header row and we do not specifiy header=true then the header row would be consider as a data row.

In [8]:
data = spark.read.csv('sample_data/california_housing_test.csv')

data
# DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string]

#data.collect()
#Row(_c0='longitude', _c1='latitude', _c2='housing_median_age', _c3='total_rooms', _c4='total_bedrooms', _c5='population', _c6='households', _c7='median_income', _c8='median_house_value'),
# Row(_c0='-122.050000', _c1='37.370000', _c2='27.000000', _c3='3885.000000', _c4='661.000000', _c5='1537.000000', _c6='606.000000', _c7='6.608500', _c8='344700.000000'),
# Row(_c0='-118.300000', _c1='34.260000', _c2='43.000000', _c3='1510.000000', _c4='310.000000', _c5='809.000000', _c6='277.000000', _c7='3.599000', _c8='176500.000000'),
# Row(_c0='-117.810000', _c1='33.780000', _c2='27.000000', _c3='3589.000000', _c4='507.000000', _c5='1484.000000', _c6='495.000000', _c7='5.793400', _c8='270500.000000')
# ...]


DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string]

In [9]:
data = spark.read.csv('sample_data/california_housing_test.csv', header=True)

data
# DataFrame[longitude: string, latitude: string, housing_median_age: string, total_rooms: string, total_bedrooms: string, population: string, households: string, median_income: string, median_house_value: string]

#data.collect()
#[Row(longitude='-122.050000', latitude='37.370000', housing_median_age='27.000000', total_rooms='3885.000000', total_bedrooms='661.000000', population='1537.000000', households='606.000000', median_income='6.608500', median_house_value='344700.000000'),
# Row(longitude='-118.300000', latitude='34.260000', housing_median_age='43.000000', total_rooms='1510.000000', total_bedrooms='310.000000', population='809.000000', households='277.000000', median_income='3.599000', median_house_value='176500.000000'),
# Row(longitude='-117.810000', latitude='33.780000', housing_median_age='27.000000', total_rooms='3589.000000', total_bedrooms='507.000000', population='1484.000000', households='495.000000', median_income='5.793400', median_house_value='270500.000000'),
# ...]

DataFrame[longitude: string, latitude: string, housing_median_age: string, total_rooms: string, total_bedrooms: string, population: string, households: string, median_income: string, median_house_value: string]

inferSchema=True let the Spark decides the schema type depend on values in the file. It infers the schema of each column.
inferSchema=False(default option) it makes every columns as string type.

The infer is a costly operation so we can provide schema manually. Even it helps sometime if we do not have a header row.
https://stackoverflow.com/questions/39926411/provide-schema-while-reading-csv-file-as-a-dataframe

In [10]:
data = spark.read.csv('sample_data/california_housing_test.csv', header=True, inferSchema=True)

data
# DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double]

#data.collect()
#[Row(longitude=-122.05, latitude=37.37, housing_median_age=27.0, total_rooms=3885.0, total_bedrooms=661.0, population=1537.0, households=606.0, median_income=6.6085, median_house_value=344700.0),
# Row(longitude=-118.3, latitude=34.26, housing_median_age=43.0, total_rooms=1510.0, total_bedrooms=310.0, population=809.0, households=277.0, median_income=3.599, median_house_value=176500.0),
# Row(longitude=-117.81, latitude=33.78, housing_median_age=27.0, total_rooms=3589.0, total_bedrooms=507.0, population=1484.0, households=495.0, median_income=5.7934, median_house_value=270500.0),
#...]


DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double]

In [11]:
data.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [12]:
filtered_data = data.select('longitude', 'latitude', 'median_house_value')
filtered_data # DataFrame[longitude: double, latitude: double, median_house_value: double]

#filtered_data.collect()
#[Row(longitude=-122.05, latitude=37.37, median_house_value=344700.0),
# Row(longitude=-118.3, latitude=34.26, median_house_value=176500.0),
# Row(longitude=-117.81, latitude=33.78, median_house_value=270500.0),
#...]


DataFrame[longitude: double, latitude: double, median_house_value: double]

### Save - write

In [13]:
# It creates number of files equal to the partitions of the dataframe.
# It creates a folder and will have part-00000-guid.csv files and _SUCCESS maker.
# Without header
filtered_data.write.csv("sample_data/california_housing_filtered_dataset_csv")

In [14]:
 # It creates a folder and will have a part-00000-guid.csv file and _SUCCESS maker.
 # With header
 # coalesce: combine partitions in a single file by repartition
filtered_data.coalesce(1).write.option('header', 'true')\
  .csv("sample_data/california_housing_filtered_single_csv")

In [19]:
# It creates number of files equal to the partitions of the dataframe.
# It creates a folder and will have part-00000-guid.json files and _SUCCESS maker.
# But it does not create json records in a array and even we do not need it because MapReduce and Spark need each record as a single entry while reading.
filtered_data.write.json("sample_data/california_housing_filtered_json2")

# Note: we can pass mode='PERMISSIVE' while reading any JSON file so it will replace any correpted json record with the null and add new column '_corrupt_record'.
# Note: mode='DROPMALFORMED' it will drop the corrupted records while reading any JSON file.

In [20]:
# This will repartition the data and writes the data in 10 files.
# There can be 10 executors which are writting the files so the writing process can be faster.
filtered_data.repartition(10).write.json("sample_data/california_housing_filtered_json_repartition2")

In [26]:
filtered_data.show(5, False)

+---------+--------+------------------+
|longitude|latitude|median_house_value|
+---------+--------+------------------+
|-122.05  |37.37   |344700.0          |
|-118.3   |34.26   |176500.0          |
|-117.81  |33.78   |270500.0          |
|-118.36  |33.82   |330000.0          |
|-119.67  |36.33   |81700.0           |
+---------+--------+------------------+
only showing top 5 rows



In [24]:
# This will create partition folders like "median_house_value=3432432.0"
filtered_data.write.partitionBy("median_house_value") \
  .json("sample_data/california_housing_filtered_json_partitionby")

In [29]:
# now the median_house_value column is not in the file :)
single_file_df = spark.read.json("/content/sample_data/california_housing_filtered_json_partitionby/median_house_value=100000.0/part-00000-2459eb2b-797e-4d34-9044-6322b67b2be9.c000.json")
single_file_df.show(5, False)

+--------+---------+
|latitude|longitude|
+--------+---------+
|39.14   |-123.21  |
|34.69   |-118.14  |
|34.06   |-117.18  |
|39.07   |-121.7   |
|32.75   |-117.08  |
+--------+---------+
only showing top 5 rows



In [30]:
# now the median_house_value column is not in the file :)
single2_df = spark.read.json("/content/sample_data/california_housing_filtered_json_partitionby/median_house_value=100000.0/")
single2_df.show(5, False)

+--------+---------+
|latitude|longitude|
+--------+---------+
|39.14   |-123.21  |
|34.69   |-118.14  |
|34.06   |-117.18  |
|39.07   |-121.7   |
|32.75   |-117.08  |
+--------+---------+
only showing top 5 rows



In [31]:
# but when we ready from the root folder then we can get the column back. So there is no meaning to keep the partition folder in the file.
partitoin1_df = spark.read.json("/content/sample_data/california_housing_filtered_json_partitionby/")
partitoin1_df.show(5, False)

+--------+---------+------------------+
|latitude|longitude|median_house_value|
+--------+---------+------------------+
|34.15   |-118.06  |500001.0          |
|37.34   |-122.07  |500001.0          |
|33.02   |-117.18  |500001.0          |
|37.79   |-122.47  |500001.0          |
|37.56   |-122.34  |500001.0          |
+--------+---------+------------------+
only showing top 5 rows



In [None]:
# In case of s3, there is a library which we need to install it then it supports 's3a://....' file path and downloads from the S3 path.
#file = sc.textFile('s3a://bucket1/file1.txt')

###Serializing(Pickling) and Deserializing(Unpicking)

In [None]:
# Pickle saves in its own format so it is not a human-readable format.
sample1_rdd = sc.textFile("sample_data/california_housing_test.csv")
sample1_rdd.saveAsPickleFile("sample_data/cal_hus_test_pickle") # it decides repartitioning its own.

sample2_rdd = sc.pickleFile('sample_data/cal_hus_test_pickle')

###Cache and Persist

In [None]:
# Spark may perform the caching of intermediate results for the expensive operations to avoid recomputation when nodes fail.
# Cache: It caches in the memory only.
# Persist: It persists in the memory and disk. We can specify through a parameter. If we don't need the RDD anymore then call the unpersist.

#sample2_rdd.persist(MEMORY_ONLY)

NameError: ignored