#  Read CSV

In [1]:
from pyspark.sql import SparkSession

In [2]:
mycsv = "/home/fdic_20200331/All_Reports_20200331_Income and Expense.csv"  # Should be some file on your system

In [3]:
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

In [4]:
csvdataDF = spark.read.load(mycsv, format="csv", sep=",", inferSchema="true", header="true")
csvdataDF.registerTempTable("incomes")

In [18]:
print("CSV data count ", csvdataDF.count())

CSV data count  5125


In [6]:
csvdataDF.show()

+-----+------+--------+-------+--------------------+----------------+-----+-----+---------+--------+-------+--------------------+--------------------+------+------+------+-------+--------+-------------+----------+--------------------+----------+----------+----------+------+-------+-----+-------+--------+-------------+-------------+--------------+-------------+------------+---------+-----+---+--------------------+------+-------+-----+------+-----+------+-------+-------+--------+-----+-----+--------+------+-------+------+----+-------+-----+------+-------+-------+------+------+--------+-------+
| cert|docket|fed_rssd|rssdhcr|                name|            city|stalp|  zip|   repdte| rundate|bkclass|             address|             namehcr|offdom|offfor|stmult|specgrp|subchaps|       county|cbsa_metro|     cbsa_metro_name|    estymd|   insdate|   effdate|mutual|parcert|trust|regagnt|insagnt1|      fdicdbs|     fdicsupv|        fldoff|          fed|     occdist| otsregnm|offoa| cb|      

In [30]:
csvdataDF.printSchema()

root
 |-- cert: integer (nullable = true)
 |-- docket: integer (nullable = true)
 |-- fed_rssd: integer (nullable = true)
 |-- rssdhcr: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- stalp: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- repdte: string (nullable = true)
 |-- rundate: string (nullable = true)
 |-- bkclass: string (nullable = true)
 |-- address: string (nullable = true)
 |-- namehcr: string (nullable = true)
 |-- offdom: integer (nullable = true)
 |-- offfor: integer (nullable = true)
 |-- stmult: string (nullable = true)
 |-- specgrp: integer (nullable = true)
 |-- subchaps: integer (nullable = true)
 |-- county: string (nullable = true)
 |-- cbsa_metro: integer (nullable = true)
 |-- cbsa_metro_name: string (nullable = true)
 |-- estymd: string (nullable = true)
 |-- insdate: string (nullable = true)
 |-- effdate: string (nullable = true)
 |-- mutual: string (nullable = true)
 |-- parcert: intege

In [7]:
filteredCSVDF = spark.sql("select * from incomes where city='York'")

In [8]:
filteredCSVDF.show()

+-----+------+--------+-------+--------------------+----+-----+-----+---------+--------+-------+--------------------+--------------------+------+------+------+-------+--------+------+----------+--------------------+----------+----------+---------+------+-------+-----+-------+--------+-----------+-----------+------------+------------+------------+---------+-----+---+--------------------+------+-------+-----+------+-----+------+-------+-------+--------+-----+----+--------+------+-------+------+----+-------+-----+------+-------+-------+------+------+--------+--------+
| cert|docket|fed_rssd|rssdhcr|                name|city|stalp|  zip|   repdte| rundate|bkclass|             address|             namehcr|offdom|offfor|stmult|specgrp|subchaps|county|cbsa_metro|     cbsa_metro_name|    estymd|   insdate|  effdate|mutual|parcert|trust|regagnt|insagnt1|    fdicdbs|   fdicsupv|      fldoff|         fed|     occdist| otsregnm|offoa| cb|        inst.webaddr|intinc|eintexp|  nim|elnatr|nonii|ifidu

# Non-partitioned Parquet

## Fresh data

In [9]:
csvdataDF.write.format("parquet").mode("overwrite").save("income_nonpartitioned.parquet")

In [10]:
pqDF = spark.read.load("income_nonpartitioned.parquet", format="parquet")
print("PQ Non-partitioned partitioned count ", pqDF.count())

PQ Non-partitioned partitioned count  5125


In [11]:
pqDF.show()

+-----+------+--------+-------+--------------------+----------------+-----+-----+---------+--------+-------+--------------------+--------------------+------+------+------+-------+--------+-------------+----------+--------------------+----------+----------+----------+------+-------+-----+-------+--------+-------------+-------------+--------------+-------------+------------+---------+-----+---+--------------------+------+-------+-----+------+-----+------+-------+-------+--------+-----+-----+--------+------+-------+------+----+-------+-----+------+-------+-------+------+------+--------+-------+
| cert|docket|fed_rssd|rssdhcr|                name|            city|stalp|  zip|   repdte| rundate|bkclass|             address|             namehcr|offdom|offfor|stmult|specgrp|subchaps|       county|cbsa_metro|     cbsa_metro_name|    estymd|   insdate|   effdate|mutual|parcert|trust|regagnt|insagnt1|      fdicdbs|     fdicsupv|        fldoff|          fed|     occdist| otsregnm|offoa| cb|      

## Appending data

In [12]:
csvdataDF.write.format("parquet").mode("append").save("income_nonpartitioned.parquet")

In [13]:
pqDF = spark.read.load("income_nonpartitioned.parquet", format="parquet")
print("PQ Non-partitioned partitioned count ", pqDF.count())

PQ Non-partitioned partitioned count  10250


# Partitioned Parquet file

# Fresh data

.mode(...).option("compression", "gzip") can be used for gzip compression

In [19]:
csvdataDF.write.partitionBy("city").format("parquet").mode("overwrite").option("compression", "gzip").save("income_part.parquet")

In [20]:
pqDF = spark.read.load("income_part.parquet", format="parquet")
pqDF.registerTempTable("pqincomes")
print("PQ Non-partitioned partitioned count ", pqDF.count())

PQ Non-partitioned partitioned count  5125


In [21]:
filteredCSVDF = spark.sql("select * from pqincomes where city='York'")
filteredCSVDF.show()

+-----+------+--------+-------+--------------------+-----+-----+---------+--------+-------+--------------------+--------------------+------+------+------+-------+--------+------+----------+--------------------+----------+----------+---------+------+-------+-----+-------+--------+-----------+-----------+------------+------------+------------+---------+-----+---+--------------------+------+-------+-----+------+-----+------+-------+-------+--------+-----+----+--------+------+-------+------+----+-------+-----+------+-------+-------+------+------+--------+--------+----+
| cert|docket|fed_rssd|rssdhcr|                name|stalp|  zip|   repdte| rundate|bkclass|             address|             namehcr|offdom|offfor|stmult|specgrp|subchaps|county|cbsa_metro|     cbsa_metro_name|    estymd|   insdate|  effdate|mutual|parcert|trust|regagnt|insagnt1|    fdicdbs|   fdicsupv|      fldoff|         fed|     occdist| otsregnm|offoa| cb|        inst.webaddr|intinc|eintexp|  nim|elnatr|nonii|ifiduc|ise