# Reading Writing and Validating Data in PySpark

I will be: 

 - Reading in Data
 - Partioned Files
 - Validating Data
 - Specifying Data Types
 - Writing Data

 

In [1]:
# First let's create our PySpark instance!

# PC users can use the next two lines of code but mac users don't need it
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("ReadWriteVal").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


## Reading data

A DataFrame is equivalent to a relational table in Spark SQL, and can be created using various functions in SparkSession.

First let's try reading in a csv file containing a list of students and their grades.

**Source:** https://www.kaggle.com/spscientist/students-performance-in-exams

In [8]:
# Start by reading a basic csv dataset
# Let Spark know about the header and infer the Schema types!

students =spark.read.csv("/Users/riteshtripathi/Desktop/Pyspark DataScience/Read Write Validate Datasets/students.csv", 
                         inferSchema = True, header = True)

# Some csv data
#students = spark.read.csv(path+'students.csv',inferSchema=True,header=True)

students.show()
#this will look more ugly to read as it will have alot of colns to show
#to show better, convert dataframe to pandas
students.limit(4).toPandas()

+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
|gender|race/ethnicity|parental level of education|       lunch|test preparation course|math score|reading score|writing score|
+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
|female|       group B|          bachelor's degree|    standard|                   none|        72|           72|           74|
|female|       group C|               some college|    standard|              completed|        69|           90|           88|
|female|       group B|            master's degree|    standard|                   none|        90|           95|           93|
|  male|       group A|         associate's degree|free/reduced|                   none|        47|           57|           44|
|  male|       group C|               some college|    standard|                   none|        76|     

Unnamed: 0,gender,race/ethnicity,parental level of education,lunch,test preparation course,math score,reading score,writing score
0,female,group B,bachelor's degree,standard,none,72,72,74
1,female,group C,some college,standard,completed,69,90,88
2,female,group B,master's degree,standard,none,90,95,93
3,male,group A,associate's degree,free/reduced,none,47,57,44


**Parquet Files**

Now try reading in a parquet file. This is most common data type in the big data world.
Why? because it is the most compact file storage method (even better than zipped files!)

In [10]:
# How to read parquet files
#these files are better than zip and csv files
#big organization use these kind of files
parquet = spark.read.parquet("/Users/riteshtripathi/Desktop/Pyspark DataScience/Read Write Validate Datasets/users1.parquet")
parquet.show(2)
parquet.limit(4).toPandas()

# Parquet files are partioned files

+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
|  registration_dttm| id|first_name|last_name|           email|gender|    ip_address|              cc|  country|birthdate|   salary|           title|comments|
+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
|2016-02-03 10:55:29|  1|    Amanda|   Jordan|ajordan0@com.com|Female|   1.197.201.2|6759521864920116|Indonesia| 3/8/1971| 49756.53|Internal Auditor|   1E+02|
|2016-02-03 20:04:03|  2|    Albert|  Freeman| afreeman1@is.gd|  Male|218.111.175.34|                |   Canada|1/16/1968|150280.17|   Accountant IV|        |
+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
only showing top 2 rows



Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 10:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2016-02-03 20:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-03 04:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,
3,2016-02-03 03:36:21,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,


**Partitioned Parquet Files**

Actually most big datasets will be partitioned. Here is how you can collect all the pieces (parts) of the dataset in one simple command.

In [15]:
# Since i have many parquet files, to read all of them at once:
partitioned = spark.read.parquet("/Users/riteshtripathi/Desktop/Pyspark DataScience/Read Write Validate Datasets/"+'users*')
#anything that starts with users.. load those files
partitioned.show(2)
partitioned.limit(4).toPandas()

+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
|  registration_dttm| id|first_name|last_name|           email|gender|    ip_address|              cc|  country|birthdate|   salary|           title|comments|
+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
|2016-02-03 10:55:29|  1|    Amanda|   Jordan|ajordan0@com.com|Female|   1.197.201.2|6759521864920116|Indonesia| 3/8/1971| 49756.53|Internal Auditor|   1E+02|
|2016-02-03 20:04:03|  2|    Albert|  Freeman| afreeman1@is.gd|  Male|218.111.175.34|                |   Canada|1/16/1968|150280.17|   Accountant IV|        |
+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
only showing top 2 rows



Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 10:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2016-02-03 20:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-03 04:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,
3,2016-02-03 03:36:21,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,


You can also opt to read in only a specific set of paritioned parquet files. Say for example that you only wanted users1 and users2 and not users3

In [5]:
# Note that the .option("basePath", path) option is used to override the automatic function
# that will exclude the partitioned variable in resulting dataframe. 
# I prefer to have the partitioning info in my new dataframe personally. 
# Now, just to get User 1 and User 2 i need
#users1_2 = spark.read.option("basePath", path).parquet(path+'users1.parquet', path+'users2.parquet')
users1_2 = spark.read.parquet("/Users/riteshtripathi/Desktop/Pyspark DataScience/Read Write Validate Datasets/"+'users1.parquet', +'users2.parquet')
users1_2.show()
#got issues with this command

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



## Validating Data

Next you will want to validate that you dataframe was read in correct. We will get into more detailed data evaluation later on but first we need to ensure that all the variable types were infered correctly and that the values actually made it in... sometimes they don't :)

In [22]:
# Get an inital view of your dataframe
students.show(3)

students.printSchema()

students.columns

students.describe()

students.schema['math score'].dataType

students.select("math score", "reading score").summary("count", "min", "max").show()
#will get summary for each of the variables

+------+--------------+---------------------------+--------+-----------------------+----------+-------------+-------------+
|gender|race/ethnicity|parental level of education|   lunch|test preparation course|math score|reading score|writing score|
+------+--------------+---------------------------+--------+-----------------------+----------+-------------+-------------+
|female|       group B|          bachelor's degree|standard|                   none|        72|           72|           74|
|female|       group C|               some college|standard|              completed|        69|           90|           88|
|female|       group B|            master's degree|standard|                   none|        90|           95|           93|
+------+--------------+---------------------------+--------+-----------------------+----------+-------------+-------------+
only showing top 3 rows

root
 |-- gender: string (nullable = true)
 |-- race/ethnicity: string (nullable = true)
 |-- parental leve

In [6]:
# If your dataframe is more than just a few variables, this method is way better
students.limit(5).toPandas()

Unnamed: 0,gender,race/ethnicity,parental level of education,lunch,test preparation course,math score,reading score,writing score
0,female,group B,bachelor's degree,standard,none,72,72,74
1,female,group C,some college,standard,completed,69,90,88
2,female,group B,master's degree,standard,none,90,95,93
3,male,group A,associate's degree,free/reduced,none,47,57,44
4,male,group C,some college,standard,none,76,78,75


In [None]:
# Note the types here:
print(type(students))
studentsPdf = students.toPandas()
print(type(studentsPdf))

In [7]:
# A Solid Summary of your data:

#show the data (like df.head())
print(students.printSchema())
print("")
print(students.columns)
print("")
print(students.describe()) # Not so fond of this one but to each their own

root
 |-- gender: string (nullable = true)
 |-- race/ethnicity: string (nullable = true)
 |-- parental level of education: string (nullable = true)
 |-- lunch: string (nullable = true)
 |-- test preparation course: string (nullable = true)
 |-- math score: integer (nullable = true)
 |-- reading score: integer (nullable = true)
 |-- writing score: integer (nullable = true)

None

['gender', 'race/ethnicity', 'parental level of education', 'lunch', 'test preparation course', 'math score', 'reading score', 'writing score']

DataFrame[summary: string, gender: string, race/ethnicity: string, parental level of education: string, lunch: string, test preparation course: string, math score: string, reading score: string, writing score: string]


In [10]:
# If you need to get the type of just ONE column by name you can use this function:
students.schema['math score'].dataType

IntegerType

In [9]:
# Neat "describe" function
students.describe(['math score']).show()

+-------+------------------+
|summary|        math score|
+-------+------------------+
|  count|              1000|
|   mean|            66.089|
| stddev|15.163080096009454|
|    min|                 0|
|    max|               100|
+-------+------------------+



In [8]:
# Summary function
students.select("math score", "reading score","writing score").summary("count", "min", "25%", "75%", "max").show()

+-------+----------+-------------+-------------+
|summary|math score|reading score|writing score|
+-------+----------+-------------+-------------+
|  count|      1000|         1000|         1000|
|    min|         0|           17|           10|
|    25%|        57|           59|           57|
|    75%|        77|           79|           79|
|    max|       100|          100|          100|
+-------+----------+-------------+-------------+



## How to specify data types as you read in datasets.

Some data types make it easier to infer schema (like tabular formats such as csv which we will show later). 

However you often have to set the schema yourself if you aren't dealing with a .read method that doesn't have inferSchema() built-in.

Spark has all the tools you need for this, it just requires a very specific structure:

In [23]:
from pyspark.sql.types import StructField,StringType,IntegerType,StructType,DateType

Next we need to create the list of Structure fields
    * :param name: string, name of the field.
    * :param dataType: :class:`DataType` of the field.
    * :param nullable: boolean, whether the field can be null (None) or not.

In [24]:
data_schema = [StructField("name", StringType(), True),
               StructField("email", StringType(), True),
               StructField("city", StringType(), True),
               StructField("mac", StringType(), True),
               StructField("timestamp", DateType(), True),
               StructField("creditcard", StringType(), True)]

In [25]:
final_struc = StructType(fields=data_schema)
#creating an object to hold the structure

We'll do a .json file this time :) 

**Source:** https://gist.github.com/raine/da15845f332a2fb8937b344504abfbe0

In [26]:
people = spark.read.json("/Users/riteshtripathi/Desktop/Pyspark DataScience/Read Write Validate Datasets/"+'people.json', 
                         schema=final_struc)

In [28]:
people.printSchema()
people.limit(4).toPandas()

root
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- city: string (nullable = true)
 |-- mac: string (nullable = true)
 |-- timestamp: date (nullable = true)
 |-- creditcard: string (nullable = true)



Unnamed: 0,name,email,city,mac,timestamp,creditcard
0,,,,,,
1,Keeley Bosco,katlyn@jenkinsmaggio.net,Lake Gladysberg,08:fd:0b:cd:77:f7,2015-04-25,1228-1221-1221-1431
2,Rubye Jerde,juvenal@johnston.name,,90:4d:fa:42:63:a2,2015-04-25,1228-1221-1221-1431
3,Miss Darian Breitenberg,,,f9:0e:d3:40:cb:e9,2015-04-25,


## Writing Data

First let's just try writing a simple csv file.

In [None]:
# Note the funky naming convention of the file in your output folder. There is no way to directly change this. 
students.write.mode("overwrite").csv('write_test.csv')
#this wil save the file with a different file name, which is in partioned type
#to better save it in a better naming, follow the below code

Note the strange naming convention of the output file in the path that you specified. Spark uses Hadoop File Format, which requires data to be partitioned - that's why you have part- files. If you want to rename your written files to a more user friendly format, you can do that using the method below:

In [None]:
from py4j.java_gateway import java_import
java_import(spark._jvm, 'org.apache.hadoop.fs.Path')

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
file = fs.globStatus(spark._jvm.Path('write_test.csv/part*'))[0].getPath().getName()
fs.rename(spark._jvm.Path('write_test.csv/' + file), spark._jvm.Path('write_test2.csv')) #these two need to be different
fs.delete(spark._jvm.Path('write_test.csv'), True)

#the file will be saved as write_test2.csv 
#and then i can double click on the name and mention whatever name i need to give

#### Writting Parquet files

Now let's try writing a parquet file. This is best practice for big data as it is the most compact storage method.

In [None]:
users1_2.write.mode("overwrite").parquet('parquet/')

For those who got an error attempting to run the above code. Try this solution: https://stackoverflow.com/questions/59220832/unable-to-write-spark-dataframe-to-a-parquet-file-format-to-c-drive-in-pyspark

#### Writting Partitioned Parquet Files

Now try to write a partioned parquet file... super fun!

In [None]:
# Partition by gender
# you can partition also by several variables too
users1_2.write.mode("overwrite").partitionBy("gender").parquet('part_parquet/')

#### Writting your own dataframes here!

You can also create your own dataframes directly here in your Juypter Notebook too if you want. 

Like this!

In [None]:
values = [('Pear',10),('Orange',36),('Banana',123),('Kiwi',48),('Peach',16),('Strawberry',1)]
df = spark.createDataFrame(values,['fruit','quantity'])
df.show()