In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

### Reading CSV or any file

Usually CSV files are read as all text due to how they store the data. `inferSchema` can be used to decide the appropriate data type for each column due to this. But it's inefficient when it comes to larger data due to inferSchema doing multiple reads

In [0]:
path = '/Volumes/population_metrics/landing/datasets/countries_dataset/csv_data/countries_population/countries_population.csv'
df = spark.read.format('csv').options(header=True, inferSchema=False).load(path)
df.limit(5).display()

COUNTRY_ID,NAME,NATIONALITY,COUNTRY_CODE,ISO_ALPHA2,CAPITAL,POPULATION,AREA_KM2,REGION_ID,SUB_REGION_ID
210,Sri Lanka,Sri Lankan,LKA,LK,Sri Jayawardenapura Kotte,21323733,65610,30,30
211,Sudan,Sudanese,SDN,SD,Khartoum,42813238,1886068,50,40
212,Suriname,Surinamese,SUR,SR,Paramaribo,581372,163820,10,10
213,Svalbard and Jan Mayen,Svalbard,SJM,SJ,Longyearbyen,2667,61399,20,120
214,Sweden,Swedish,SWE,SE,Stockholm,10036379,450295,20,120


#### Understanding Data types or Schema of a Data Frame

In [0]:
df.dtypes

[('COUNTRY_ID', 'string'),
 ('NAME', 'string'),
 ('NATIONALITY', 'string'),
 ('COUNTRY_CODE', 'string'),
 ('ISO_ALPHA2', 'string'),
 ('CAPITAL', 'string'),
 ('POPULATION', 'string'),
 ('AREA_KM2', 'string'),
 ('REGION_ID', 'string'),
 ('SUB_REGION_ID', 'string')]

In [0]:
df.printSchema()

root
 |-- COUNTRY_ID: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- NATIONALITY: string (nullable = true)
 |-- COUNTRY_CODE: string (nullable = true)
 |-- ISO_ALPHA2: string (nullable = true)
 |-- CAPITAL: string (nullable = true)
 |-- POPULATION: string (nullable = true)
 |-- AREA_KM2: string (nullable = true)
 |-- REGION_ID: string (nullable = true)
 |-- SUB_REGION_ID: string (nullable = true)



### Providing the Schema as a SQL stirng

In [0]:
sql_schema = '''
COUNTRY_ID INT,
NAME STRING,
NATIONALITY STRING,
COUNTRY_CODE STRING,
ISO_ALPHA2 STRING,
CAPITAL STRING,
POPULATION INT,
AREA_KM2 DOUBLE,
REGION_ID INT,
SUB_REGION_ID INT
'''

df_sql_schema = spark.read.format('csv').schema(sql_schema).options(header=True).load(path)
df_sql_schema.printSchema()

root
 |-- COUNTRY_ID: integer (nullable = true)
 |-- NAME: string (nullable = true)
 |-- NATIONALITY: string (nullable = true)
 |-- COUNTRY_CODE: string (nullable = true)
 |-- ISO_ALPHA2: string (nullable = true)
 |-- CAPITAL: string (nullable = true)
 |-- POPULATION: integer (nullable = true)
 |-- AREA_KM2: double (nullable = true)
 |-- REGION_ID: integer (nullable = true)
 |-- SUB_REGION_ID: integer (nullable = true)



### Passing the Schema with Structs programmactically

In [0]:
from pyspark.sql import types as T

struct_chema = T.StructType([
    T.StructField('COUNTRY_ID', T.IntegerType(), False),      # Keeping this as the primary key by setting `nullable` to False
    T.StructField('NAME', T.StringType(), True),
    T.StructField('NATIONALITY', T.StringType(), True),
    T.StructField('COUNTRY_CODE', T.StringType(), True),
    T.StructField('ISO_ALPHA2', T.StringType(), True),
    T.StructField('CAPITAL', T.StringType(), True),
    T.StructField('POPULATION', T.IntegerType(), True),
    T.StructField('AREA_KM2', T.DoubleType(), True),
    T.StructField('REGION_ID', T.IntegerType(), True),
    T.StructField('SUB_REGION_ID', T.IntegerType(), True)
])

df_struct_schema = spark.read.format('csv').schema(struct_chema).options(header=True).load(path)
df_struct_schema.printSchema()

root
 |-- COUNTRY_ID: integer (nullable = true)
 |-- NAME: string (nullable = true)
 |-- NATIONALITY: string (nullable = true)
 |-- COUNTRY_CODE: string (nullable = true)
 |-- ISO_ALPHA2: string (nullable = true)
 |-- CAPITAL: string (nullable = true)
 |-- POPULATION: integer (nullable = true)
 |-- AREA_KM2: double (nullable = true)
 |-- REGION_ID: integer (nullable = true)
 |-- SUB_REGION_ID: integer (nullable = true)



In [0]:
df_struct_schema.limit(3).display()

COUNTRY_ID,NAME,NATIONALITY,COUNTRY_CODE,ISO_ALPHA2,CAPITAL,POPULATION,AREA_KM2,REGION_ID,SUB_REGION_ID
210,Sri Lanka,Sri Lankan,LKA,LK,Sri Jayawardenapura Kotte,21323733,65610.0,30,30
211,Sudan,Sudanese,SDN,SD,Khartoum,42813238,1886068.0,50,40
212,Suriname,Surinamese,SUR,SR,Paramaribo,581372,163820.0,10,10


### Writing Data Frame into Different Formats

Can be written into formats like,
- JSON
- CSV
- ORC: Similar to Parquet. Applies a snappy compression automatically at the time of writing
- PARQUET
- DELTA

When reading into types like json, csv, etc. in the volume additional market files such as (_STARTED, _COMMITED, _SUCCESS will be created for each task id of the corresponding write)

In [0]:
# Writing to parquet
df_struct_schema.write.format('parquet').mode('overwrite').save('/Volumes/population_metrics/landing/datasets/output_data/parquet/')

In [0]:
# To test out, the parquet file just written above can be read into a data frame
df_parquet = spark.read.format('parquet').load('/Volumes/population_metrics/landing/datasets/output_data/parquet/')
df_parquet.limit(4).display()

COUNTRY_ID,NAME,NATIONALITY,COUNTRY_CODE,ISO_ALPHA2,CAPITAL,POPULATION,AREA_KM2,REGION_ID,SUB_REGION_ID
210,Sri Lanka,Sri Lankan,LKA,LK,Sri Jayawardenapura Kotte,21323733,65610.0,30,30
211,Sudan,Sudanese,SDN,SD,Khartoum,42813238,1886068.0,50,40
212,Suriname,Surinamese,SUR,SR,Paramaribo,581372,163820.0,10,10
213,Svalbard and Jan Mayen,Svalbard,SJM,SJ,Longyearbyen,2667,61399.0,20,120


### Partitioning Data

By using the `partitionBy` option at the time of writing into a data file format. Can be illustrated as, in real world use cases that date by partitions are used to group data in folders.

Ideally partitioning can be done with multiple columns. The partitioning will happen in the order specified. For example ".partitionBy('REGION_ID', 'SUB_REGION_ID')" is given, inside the "region_id=10" folder there will be sub folders for each sub region of that data

In [0]:
df_struct_schema.write.format('csv').partitionBy('REGION_ID').save('/Volumes/population_metrics/landing/datasets/output_data/csv_partitioned')