# Read, Write and Validate Code Along Activity

__Objectives:__  
- Reading in data;
- Partitioned files;
- Validating data;
- Specifying data types;
- Writing data;

In [5]:
%config Completer.use_jedi = False

import os
import pyspark
# import findspark

from pyspark.sql import SparkSession

# findspark.init()

path = "datasets/"

spark = SparkSession.builder.appName("ReadWriteVal").getOrCreate()
spark

In [3]:
cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
cores

1

In [6]:
students = spark.read.csv(
    os.path.join(path, "students.csv"), 
    inferSchema=True,
    header=True
)
students.limit(4).toPandas()

Unnamed: 0,gender,race/ethnicity,parental level of education,lunch,test preparation course,math score,reading score,writing score
0,female,group B,bachelor's degree,standard,none,72,72,74
1,female,group C,some college,standard,completed,69,90,88
2,female,group B,master's degree,standard,none,90,95,93
3,male,group A,associate's degree,free/reduced,none,47,57,44


In [10]:
parquet = spark.read.parquet(
    os.path.join(path, "users1.parquet")
)
print(parquet.count())
parquet.limit(4).toPandas()

1000


Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 05:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2016-02-03 15:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-02 23:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,
3,2016-02-02 22:36:21,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,


In [12]:
partitioned = spark.read.parquet(
    os.path.join(path, "users*")
)
print(partitioned.count())
partitioned.limit(4).toPandas()

3000


Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 05:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2016-02-03 15:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-02 23:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,
3,2016-02-02 22:36:21,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,


>__NOTE:__
>In cases where you have defined a partition key, when you load all files, 
>that key wont be shown. To workaround it you have to use the option "bathPath"
>like following:  
>  
>````python
>spark.read.option("bathPath", path).parquet(path+"users*")
>````

## Validating Data

In [13]:
students = spark.read.csv(
    os.path.join(path, "students.csv"), 
    inferSchema=True,
    header=True
)
students.printSchema()

root
 |-- gender: string (nullable = true)
 |-- race/ethnicity: string (nullable = true)
 |-- parental level of education: string (nullable = true)
 |-- lunch: string (nullable = true)
 |-- test preparation course: string (nullable = true)
 |-- math score: integer (nullable = true)
 |-- reading score: integer (nullable = true)
 |-- writing score: integer (nullable = true)



In [14]:
students.columns

['gender',
 'race/ethnicity',
 'parental level of education',
 'lunch',
 'test preparation course',
 'math score',
 'reading score',
 'writing score']

In [15]:
students.describe()

DataFrame[summary: string, gender: string, race/ethnicity: string, parental level of education: string, lunch: string, test preparation course: string, math score: string, reading score: string, writing score: string]

In [16]:
students.schema["math score"].dataType

IntegerType

In [18]:
students.select("math score", "reading score").summary("count", "min", "max").show()

+-------+----------+-------------+
|summary|math score|reading score|
+-------+----------+-------------+
|  count|      1000|         1000|
|    min|         0|           17|
|    max|       100|          100|
+-------+----------+-------------+



## How to specify data types

In [26]:
from pyspark.sql.types import (
    StructField, 
    StructType,
    StringType,
    DateType,
)

data_schema = [
    StructField("name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("city", StringType(), True),
    StructField("mac", StringType(), True),
    StructField("timestamp", DateType(), True),
    StructField("creditcard", StringType(), True),
]

final_struct = StructType(fields=data_schema)

people = spark.read.json(
    os.path.join(path, "people.json"),
    schema=final_struct
)

people.limit(4).toPandas()

Unnamed: 0,name,email,city,mac,timestamp,creditcard
0,,,,,,
1,Keeley Bosco,katlyn@jenkinsmaggio.net,Lake Gladysberg,08:fd:0b:cd:77:f7,2015-04-25,1228-1221-1221-1431
2,Rubye Jerde,juvenal@johnston.name,,90:4d:fa:42:63:a2,2015-04-25,1228-1221-1221-1431
3,Miss Darian Breitenberg,,,f9:0e:d3:40:cb:e9,2015-04-25,


In [27]:
people.printSchema()

root
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- city: string (nullable = true)
 |-- mac: string (nullable = true)
 |-- timestamp: date (nullable = true)
 |-- creditcard: string (nullable = true)



## Writing Data

In [14]:
students = spark.read.csv(
    os.path.join(path, "students.csv"), 
    inferSchema=True,
    header=True
)

students.write.mode("overwrite").csv("output/write_test.csv")

>

>__NOTE:__  
>By default, when you call write, spark will always asume that you're saving a 
>partitioned file. Thus it will create a directory with the name that you have 
>specified and the content of this directory will be the file partitions. 
>Although, if you don't whant this behavior, you have to use the following 
>workaround:  
>````python
>import os
>
>from py4j.java_gateway import java_import
>
>java_import(spark._jvm, "org.apache.hadoop.fs.Path")
>
>fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(
>    spark._jsc.hadoopConfiguration()
>)
>
>file = fs.globStatus(spark._jvm.Path("write_test.csv/part*"))[0].getPath().getName()
>
>fs.rename(
>    spark._jvm.Path(os.path.join("write_test.csv", file)),
>    spark._jvm.Path("write_test_flat.csv")
>)
>
>fs.delete(spark._jvm.Path("write_test.csv"), True)
>````

In [16]:
users1_2 = spark.read.parquet(
    os.path.join(path, "users1.parquet"),
    os.path.join(path, "users2.parquet"),
)

In [18]:
users1_2.write.mode("overwrite").parquet("output/parquet/")

In [19]:
users1_2.write.mode("overwrite").partitionBy("gender").parquet("output/part_parquet/")