<h1 align='center'>Lets Start working with CSV file in Pyspark</h1>

In [3]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

* Let start try to read the CSV file

In [2]:
df_csv = spark.read.csv("data/annual1.csv")

* using printSchema we can check the dataframe Schema

In [3]:
df_csv.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)



* from above example reads the data into DataFrame columns "_c0" for the first column and "_c1" for the second and so on.
* In input file and have header then you have to explicitly need to mention True as we did in the below code script.

In [4]:
df_csv_header = spark.read.csv("data/annual1.csv", header=True)

In [5]:
df_csv_header.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Industry_aggregation_NZSIOC: string (nullable = true)
 |-- Industry_code_NZSIOC: string (nullable = true)
 |-- Industry_name_NZSIOC: string (nullable = true)
 |-- Units: string (nullable = true)
 |-- Variable_code: string (nullable = true)
 |-- Variable_name: string (nullable = true)
 |-- Variable_category: string (nullable = true)
 |-- Value: string (nullable = true)
 |-- Industry_code_ANZSIC06: string (nullable = true)



* Read multiple CSV file in Pyspark

In [7]:
final_path = ["C:/Users/RajanSahu/Desktop/study/Pyspark/data/annual1.csv", "C:/Users/RajanSahu/Desktop/study/Pyspark/data/annual2.csv"]

df_csv_multiple = spark.read.csv(final_path,sep=',',
                       inferSchema=True, header=True)

In [8]:
df_csv_multiple.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Industry_aggregation_NZSIOC: string (nullable = true)
 |-- Industry_code_NZSIOC: string (nullable = true)
 |-- Industry_name_NZSIOC: string (nullable = true)
 |-- Units: string (nullable = true)
 |-- Variable_code: string (nullable = true)
 |-- Variable_name: string (nullable = true)
 |-- Variable_category: string (nullable = true)
 |-- Value: string (nullable = true)
 |-- Industry_code_ANZSIC06: string (nullable = true)



* Read all the file of the directory

In [None]:
path ="C:/Users/RajanSahu/Desktop/study/Pyspark/data/*.csv"
df_csv_directory =spark.read.csv(path)

In [None]:
df_csv_directory.show()

* Lets see about Options while reading the data from CSV file.

In [6]:
df_options_delimeter = spark.read.options(delimiter=',') \
  .csv("C:/Users/RajanSahu/Desktop/study/Pyspark/data/annual1.csv")

In [7]:
df_options_delimeter.count()

41702

In [5]:
df_option_delimeter = spark.read.option("delimiter",',') \
  .csv("C:/Users/RajanSahu/Desktop/study/Pyspark/data/annual1.csv")

In [8]:
df_option_delimeter.count()

41702

* Delimeter is used to specified the csv file delimeter.

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)



In [9]:
df_options_inferschema = spark.read.options(inferSchema='True',delimiter=',') \
                              .csv("C:/Users/RajanSahu/Desktop/study/Pyspark/data/annual1.csv")

In [10]:
df_option_inferschema = spark.read.option("inferSchema",True) \
                            .option("delimiter",",") \
                            .csv("C:/Users/RajanSahu/Desktop/study/Pyspark/data/annual1.csv")

* default value of InferSchema is False
* When you set InferSchema is True then it automatically infer the schema if the column.

In [11]:
df_option_inferschema.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)



In [13]:
df_options_inferschema_header = spark.read.options(header=True, inferSchema=True, delimiter=',') \
                        .csv("C:/Users/RajanSahu/Desktop/study/Pyspark/data/annual1.csv")

In [14]:
df_option_inferschema_header = spark.read.option("header",True) \
                        .option("inferSchema",True) \
                        .option("delimiter",',')  \
                        .csv("C:/Users/RajanSahu/Desktop/study/Pyspark/data/annual1.csv")

In [15]:
df_option_inferschema_header.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Industry_aggregation_NZSIOC: string (nullable = true)
 |-- Industry_code_NZSIOC: string (nullable = true)
 |-- Industry_name_NZSIOC: string (nullable = true)
 |-- Units: string (nullable = true)
 |-- Variable_code: string (nullable = true)
 |-- Variable_name: string (nullable = true)
 |-- Variable_category: string (nullable = true)
 |-- Value: string (nullable = true)
 |-- Industry_code_ANZSIC06: string (nullable = true)



root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)



In [50]:
df_option_inferschema_header.select("Value","Industry_code_NZSIOC", "Value").show()

+---------+--------------------+---------+
|    Value|Industry_code_NZSIOC|    Value|
+---------+--------------------+---------+
|   69,127|               99999|   69,127|
|  103,330|               99999|  103,330|
|2,512,677|               99999|2,512,677|
|  730,587|               99999|  730,587|
|  591,351|               99999|  591,351|
|1,190,739|               99999|1,190,739|
|2,512,677|               99999|2,512,677|
|  813,949|               99999|  813,949|
|  933,093|               99999|  933,093|
|  765,635|               99999|  765,635|
|  400,900|               99999|  400,900|
|   54,700|               99999|   54,700|
|       78|               99999|       78|
|       71|               99999|       71|
|       13|               99999|       13|
|        4|               99999|        4|
|       32|               99999|       32|
|   48,731|                  AA|   48,731|
|   45,650|                  AA|   45,650|
|      640|                  AA|      640|
+---------+

* As you can see from the coloums all are not string It is Integer let use feature of spark and change it 

In [51]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

structureSchema = StructType([ \
    StructField("Year",IntegerType(),True), \
    StructField("Industry_aggregation_NZSIOC",StringType(),True), \
    StructField("Industry_code_NZSIOC",IntegerType(),True), \
    StructField("Industry_name_NZSIOC",StringType(),True), \
    StructField("Units", StringType(), True), \
    StructField("Variable_code", StringType(), True), \
    StructField("Variable_name", StringType(), True), \
    StructField("Variable_category", StringType(), True), \
    StructField("Value", IntegerType(), True), \
    StructField("Industry_code_ANZSIC06", StringType(), True) \
  ])

In [52]:
df_options = spark.read.schema(structureSchema) \
                        .option("header",True) \
                        .option("inferSchema",True) \
                        .option("delimiter",',') \
                        .csv("C:/Users/RajanSahu/Desktop/study/Pyspark/data/annual1.csv")

In [53]:
df_options.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Industry_aggregation_NZSIOC: string (nullable = true)
 |-- Industry_code_NZSIOC: integer (nullable = true)
 |-- Industry_name_NZSIOC: string (nullable = true)
 |-- Units: string (nullable = true)
 |-- Variable_code: string (nullable = true)
 |-- Variable_name: string (nullable = true)
 |-- Variable_category: string (nullable = true)
 |-- Value: integer (nullable = true)
 |-- Industry_code_ANZSIC06: string (nullable = true)



In [48]:
df_options.select("Industry_code_ANZSIC06")

+----------------------+
|Industry_code_ANZSIC06|
+----------------------+
|  ANZSIC06 division...|
|  ANZSIC06 division...|
+----------------------+
only showing top 2 rows



In [38]:
df_option1 = spark.read.options(schema=structureSchema,header=True, inferSchema=True, delimiter=',') \
                        .csv("C:/Users/RajanSahu/Desktop/study/Pyspark/data/annual1.csv")

In [39]:
df_option1.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Industry_aggregation_NZSIOC: string (nullable = true)
 |-- Industry_code_NZSIOC: string (nullable = true)
 |-- Industry_name_NZSIOC: string (nullable = true)
 |-- Units: string (nullable = true)
 |-- Variable_code: string (nullable = true)
 |-- Variable_name: string (nullable = true)
 |-- Variable_category: string (nullable = true)
 |-- Value: string (nullable = true)
 |-- Industry_code_ANZSIC06: string (nullable = true)

