#### Ingest all_india_pincode_ds.csv

####Read CSV file using Dataframe Reader API & Write to Parquet file using Dataframe Writer API

1. Read the CSV file into a dataframe
2. Fix the header 
3. Create a explicit schema
4. Rename columns
5. Drop Columns
6. Insert a new column
7. write data to single Parquet file
8. write data to multiple Parquet file

In [0]:
dbutils.fs.mounts()

In [0]:
display(dbutils.fs.mounts())

In [0]:
pincode_df = spark.read.csv("/mnt/goverencedl/raw/all_india_pincode_ds.csv")

In [0]:
display(pincode_df)

In [0]:
pincode_df = spark.read.csv("/mnt/goverencedl/raw/all_india_pincode_ds.csv")

In [0]:
pincode_df = spark.read.option("header", True).csv("/mnt/goverencedl/raw/all_india_pincode_ds.csv")

In [0]:
type(pincode_df)

In [0]:
display(pincode_df)

In [0]:
display(pincode_df)

In [0]:
pincode_df_header = spark.read.option("header", True).csv("/mnt/goverencedl/raw/all_india_pincode_ds.csv")

In [0]:
pincode_df.printSchema()

In [0]:
display(pincode_df_header)

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [0]:
pincode_df_header.printSchema()

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [0]:
pincode_schema = StructType (fields=[StructField("officename", StringType(), False),
                                     StructField("pincode", IntegerType(), False),
                                     StructField("officeType", StringType(), False),
                                     StructField("Deliverystatus", StringType(), False),
                                     StructField("divisionname", StringType(), False),
                                     StructField("regionname", StringType(), False),
                                     StructField("circlename", StringType(), False),
                                     StructField("Taluk", StringType(), False),
                                     StructField("Districtname", StringType(), False),
                                     StructField("statename", StringType(), False),
                                     StructField("Telephone", StringType(), False),
                                     StructField("Related Suboffice", StringType(), False),
                                     StructField("Related Headoffice", StringType(), False),
                                     StructField("longitude", StringType(), False),
                                     StructField("latitude", StringType(), False)
])

In [0]:
pincode_schema = StructType (fields=[StructField("officename", StringType(), False),
                                     StructField("pincode", IntegerType(), False),
                                     StructField("officeType", StringType(), False),
                                     StructField("Deliverystatus", StringType(), False),
                                     StructField("divisionname", StringType(), False),
                                     StructField("regionname", StringType(), False),
                                     StructField("circlename", StringType(), False),
                                     StructField("Taluk", StringType(), False),
                                     StructField("Districtname", StringType(), False),
                                     StructField("statename", StringType(), False),
                                     StructField("Related Suboffice", StringType(), False),
                                     StructField("Related Headoffice", StringType(), False),
                                     StructField("longitude", StringType(), False),
                                     StructField("latitude", StringType(), False)
])

In [0]:
pincode_df = spark.read \
    .option("header", True) \
    .schema(pincode_schema) \
    .csv("/mnt/goverencedl/raw/all_india_pincode_ds.csv")

In [0]:
pincode_df_header = spark.read \
    .option("header", True) \
    .schema(pincode_schema) \
    .csv("/mnt/goverencedl/raw/all_india_pincode_ds.csv")

In [0]:
pincode_df_renamed = pincode_df.withColumnRenamed("officename", "office") \
.withColumnRenamed("officeType", "type") \
.withColumnRenamed("Deliverystatus", "status") \
.withColumnRenamed("divisionname", "division") \
.withColumnRenamed("regionname", "region") \
.withColumnRenamed("Districtname", "district") \
.withColumnRenamed("statename", "state") \
.withColumnRenamed("Related Suboffice", "suboffice") \
.withColumnRenamed("Related Headoffice", "headoffice")

In [0]:
pincode_df_dropcolumn = pincode_df_renamed.select(pincode_df_renamed["office"], pincode_df_renamed["pincode"], pincode_df_renamed["type"], pincode_df_renamed["status"],
                                                  pincode_df_renamed["division"], pincode_df_renamed["region"], pincode_df_renamed["taluk"], pincode_df_renamed["district"],
                                                  pincode_df_renamed["state"], pincode_df_renamed["telephone"], pincode_df_renamed["suboffice"], pincode_df_renamed["headoffice"])

In [0]:
from pyspark.sql.functions import current_timestamp

In [0]:
pincode_df_final = pincode_df_dropcolumn.withColumn("load_date", current_timestamp())

In [0]:
display(pincode_df_final.printSchema())

In [0]:
display(pincode_df_final)

In [0]:
pincode_df_header.printSchema()

In [0]:
pincode_df_renamed = pincode_df_header.withColumnRenamed("officename", "office") \
.withColumnRenamed("officeType", "type") \
.withColumnRenamed("Deliverystatus", "status") \
.withColumnRenamed("divisionname", "division") \
.withColumnRenamed("regionname", "region") \
.withColumnRenamed("Districtname", "district") \
.withColumnRenamed("statename", "state") \
.withColumnRenamed("Telephone", "telephone") \
.withColumnRenamed("Related Suboffice", "suboffice") \
.withColumnRenamed("Related Headoffice", "headoffice")

In [0]:
pincode_df_final.write.parquet("/mnt/goverencedl/processed/pincode_master")

In [0]:
pincode_df_dropcolumn = pincode_df_renamed.select(pincode_df_renamed["office"], pincode_df_renamed["pincode"], pincode_df_renamed["type"], pincode_df_renamed["status"],
                                                  pincode_df_renamed["division"], pincode_df_renamed["region"], pincode_df_renamed["taluk"], pincode_df_renamed["district"],
                                                  pincode_df_renamed["state"], pincode_df_renamed["telephone"], pincode_df_renamed["suboffice"], pincode_df_renamed["headoffice"])     

In [0]:
pincode_df_final.write.partitionBy('state').parquet("/mnt/goverencedl/processed/pincode_master_partition")

In [0]:
from pyspark.sql.functions import current_timestamp, when

In [0]:
display(spark.read.parquet("/mnt/goverencedl/processed/pincode_master_partition"))

In [0]:
pincode_df_final = pincode_df_dropcolumn.withColumn("load_date", current_timestamp())                                        

In [0]:
pincode_df_final = pincode_df_final.withColumn("status", when(pincode_df_final.status == "Non-Delivery", "NonDelivery").otherwise(pincode_df_final.status))

In [0]:
display(pincode_df_final)

In [0]:
display(spark.read.parquet("/mnt/goverencedl/processed/pincode_master"))

In [0]:
pincode_df_final.write.parquet("/mnt/goverencedl/processed/pincode_master")

In [0]:
pincode_df_final.write.mode("overwrite").partitionBy('state').parquet("/mnt/goverencedl/processed/pincode_master_partition")

In [0]:
display(spark.read.parquet("/mnt/goverencedl/processed/pincode_master_partition").filter("state = 'BIHAR'"))