## **Read, Write and Validate**

**Start PySpark Session**

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 42.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=11b67844e12c1a1e9e92552276108b8d49f4dc286d787fc31a4091aa6f3023b1
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [None]:
import pyspark 
from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName('ReadWriteVal').getOrCreate()
spark

**Read dataset**

In [None]:
students = spark.read.csv('students.csv', inferSchema = True, header = True)

In [None]:
# Show first 5 rows of datasers in pandas format 
students.limit(5).toPandas()

Unnamed: 0,gender,race/ethnicity,parental level of education,lunch,test preparation course,math score,reading score,writing score
0,female,group B,bachelor's degree,standard,none,72,72,74
1,female,group C,some college,standard,completed,69,90,88
2,female,group B,master's degree,standard,none,90,95,93
3,male,group A,associate's degree,free/reduced,none,47,57,44
4,male,group C,some college,standard,none,76,78,75


In [None]:
# Reading parquet file - Most compact file storage method 
parquet = spark.read.parquet('users1.parquet')
parquet.limit(5).toPandas()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 07:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2016-02-03 17:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-03 01:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,
3,2016-02-03 00:36:21,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,
4,2016-02-03 05:05:31,5,Carlos,Burns,cburns4@miitbeian.gov.cn,,169.113.235.40,5602256255204850.0,South Africa,,,,


In [None]:
# Reading all 3 parquet files at once 
partitioned = spark.read.parquet('users*') # users* will pick up anything that start with users
partitioned.limit(5).toPandas()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 07:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2016-02-03 17:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-03 01:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,
3,2016-02-03 00:36:21,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,
4,2016-02-03 05:05:31,5,Carlos,Burns,cburns4@miitbeian.gov.cn,,169.113.235.40,5602256255204850.0,South Africa,,,,


In [None]:
# Reading just users1 and users2
users_1_2 = spark.read.parquet('users1.parquet', 'users2.parquet')
users_1_2.limit(5).toPandas()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 07:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2016-02-03 17:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-03 01:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,
3,2016-02-03 00:36:21,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,
4,2016-02-03 05:05:31,5,Carlos,Burns,cburns4@miitbeian.gov.cn,,169.113.235.40,5602256255204850.0,South Africa,,,,


**Validate Data**

In [None]:
# See data type 
students.printSchema()

root
 |-- gender: string (nullable = true)
 |-- race/ethnicity: string (nullable = true)
 |-- parental level of education: string (nullable = true)
 |-- lunch: string (nullable = true)
 |-- test preparation course: string (nullable = true)
 |-- math score: integer (nullable = true)
 |-- reading score: integer (nullable = true)
 |-- writing score: integer (nullable = true)



In [None]:
# See columns 
students.columns

['gender',
 'race/ethnicity',
 'parental level of education',
 'lunch',
 'test preparation course',
 'math score',
 'reading score',
 'writing score']

In [None]:
# Summary of some data columns 
students.select('math score', 'reading score').summary('count', 'min', 'max').show()

+-------+----------+-------------+
|summary|math score|reading score|
+-------+----------+-------------+
|  count|      1000|         1000|
|    min|         0|           17|
|    max|       100|          100|
+-------+----------+-------------+



**Specify Data Type**

In [None]:
from pyspark.sql.types import * 

In [None]:
data_schema = [StructField('name', StringType(),True), 
               StructField('email', StringType(),True), 
               StructField('city', StringType(),True), 
               StructField('mac', StringType(),True), 
               StructField('timestamp', DateType(),True), 
               StructField('creditcard', StringType(),True)]

In [None]:
final_struc = StructType(fields = data_schema)

In [None]:
people = spark.read.json('people.json', schema = final_struc)
people.limit(5).toPandas()

Unnamed: 0,name,email,city,mac,timestamp,creditcard
0,,,,,,
1,Keeley Bosco,katlyn@jenkinsmaggio.net,Lake Gladysberg,08:fd:0b:cd:77:f7,2015-04-25,1228-1221-1221-1431
2,Rubye Jerde,juvenal@johnston.name,,90:4d:fa:42:63:a2,2015-04-25,1228-1221-1221-1431
3,Miss Darian Breitenberg,,,f9:0e:d3:40:cb:e9,2015-04-25,
4,Celine Ankunding,emery_kunze@rogahn.net,,3a:af:c9:0b:5c:08,2015-04-25,1228-1221-1221-1431


In [None]:
people.printSchema()

root
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- city: string (nullable = true)
 |-- mac: string (nullable = true)
 |-- timestamp: date (nullable = true)
 |-- creditcard: string (nullable = true)



## **Search and Filter Data Frames**

**Read Dataset**

In [None]:
spark = SparkSession.builder.appName('SearchandFilter').getOrCreate()
spark

In [None]:
fifa = spark.read.csv('fifa19.csv', inferSchema = True, header = True)
fifa.limit(5).toPandas()

Unnamed: 0,_c0,ID,Name,Age,Photo,Nationality,Flag,Overall,Potential,Club,...,Composure,Marking,StandingTackle,SlidingTackle,GKDiving,GKHandling,GKKicking,GKPositioning,GKReflexes,Release Clause
0,0,158023,L. Messi,31,https://cdn.sofifa.org/players/4/19/158023.png,Argentina,https://cdn.sofifa.org/flags/52.png,94,94,FC Barcelona,...,96,33,28,26,6,11,15,14,8,€226.5M
1,1,20801,Cristiano Ronaldo,33,https://cdn.sofifa.org/players/4/19/20801.png,Portugal,https://cdn.sofifa.org/flags/38.png,94,94,Juventus,...,95,28,31,23,7,11,15,14,11,€127.1M
2,2,190871,Neymar Jr,26,https://cdn.sofifa.org/players/4/19/190871.png,Brazil,https://cdn.sofifa.org/flags/54.png,92,93,Paris Saint-Germain,...,94,27,24,33,9,9,15,15,11,€228.1M
3,3,193080,De Gea,27,https://cdn.sofifa.org/players/4/19/193080.png,Spain,https://cdn.sofifa.org/flags/45.png,91,93,Manchester United,...,68,15,21,13,90,85,87,88,94,€138.6M
4,4,192985,K. De Bruyne,27,https://cdn.sofifa.org/players/4/19/192985.png,Belgium,https://cdn.sofifa.org/flags/7.png,91,92,Manchester City,...,88,68,58,51,15,13,5,10,13,€196.4M


In [None]:
fifa.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Photo: string (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- Flag: string (nullable = true)
 |-- Overall: integer (nullable = true)
 |-- Potential: integer (nullable = true)
 |-- Club: string (nullable = true)
 |-- Club Logo: string (nullable = true)
 |-- Value: string (nullable = true)
 |-- Wage: string (nullable = true)
 |-- Special: integer (nullable = true)
 |-- Preferred Foot: string (nullable = true)
 |-- International Reputation: integer (nullable = true)
 |-- Weak Foot: integer (nullable = true)
 |-- Skill Moves: integer (nullable = true)
 |-- Work Rate: string (nullable = true)
 |-- Body Type: string (nullable = true)
 |-- Real Face: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Jersey Number: integer (nullable = true)
 |-- Joined: string (nullable = true)
 |-- Loaned From: string (nu

**Select**

In [None]:
from pyspark.sql.functions import *

In [None]:
# Only show nationality, name and age columns 

fifa.select('Nationality', 'Name', 'Age').show(5, False)

+-----------+-----------------+---+
|Nationality|Name             |Age|
+-----------+-----------------+---+
|Argentina  |L. Messi         |31 |
|Portugal   |Cristiano Ronaldo|33 |
|Brazil     |Neymar Jr        |26 |
|Spain      |De Gea           |27 |
|Belgium    |K. De Bruyne     |27 |
+-----------+-----------------+---+
only showing top 5 rows



In [None]:
# Show name and age columns and order by age in ascending order 

fifa.select('Name', 'Age').orderBy(fifa['Age']).show(5)

+--------------+---+
|          Name|Age|
+--------------+---+
|     Y. Roemer| 16|
|   J. Kitolano| 16|
|   Y. Begraoui| 16|
|Y. Verschaeren| 16|
|      J. Lahne| 16|
+--------------+---+
only showing top 5 rows



In [None]:
# Show name and age columns and order by age in descending order 

fifa.select('Name', 'Age').orderBy(fifa['Age'].desc()).show(5)

+-------------+---+
|         Name|Age|
+-------------+---+
|     O. Pérez| 45|
|K. Pilkington| 44|
|    T. Warner| 44|
|  S. Narazaki| 42|
|     M. Tyler| 41|
+-------------+---+
only showing top 5 rows



In [None]:
# Find all data where club contains Barcelona 

fifa.select(['Name', 'Club']).where(fifa.Club.like('%Barcelona%')).show(5,False)

+---------------+------------+
|Name           |Club        |
+---------------+------------+
|L. Messi       |FC Barcelona|
|L. Suárez      |FC Barcelona|
|M. ter Stegen  |FC Barcelona|
|Sergio Busquets|FC Barcelona|
|Coutinho       |FC Barcelona|
+---------------+------------+
only showing top 5 rows



In [None]:
# Find more than 1 clubname 

fifa[fifa.Club.isin("FC Barcelona", "Juventus")].limit(5).toPandas()

Unnamed: 0,_c0,ID,Name,Age,Photo,Nationality,Flag,Overall,Potential,Club,...,Composure,Marking,StandingTackle,SlidingTackle,GKDiving,GKHandling,GKKicking,GKPositioning,GKReflexes,Release Clause
0,0,158023,L. Messi,31,https://cdn.sofifa.org/players/4/19/158023.png,Argentina,https://cdn.sofifa.org/flags/52.png,94,94,FC Barcelona,...,96,33,28,26,6,11,15,14,8,€226.5M
1,1,20801,Cristiano Ronaldo,33,https://cdn.sofifa.org/players/4/19/20801.png,Portugal,https://cdn.sofifa.org/flags/38.png,94,94,Juventus,...,95,28,31,23,7,11,15,14,11,€127.1M
2,7,176580,L. Suárez,31,https://cdn.sofifa.org/players/4/19/176580.png,Uruguay,https://cdn.sofifa.org/flags/60.png,91,91,FC Barcelona,...,85,62,45,38,27,25,31,33,37,€164M
3,15,211110,P. Dybala,24,https://cdn.sofifa.org/players/4/19/211110.png,Argentina,https://cdn.sofifa.org/flags/52.png,89,94,Juventus,...,84,23,20,20,5,4,4,5,8,€153.5M
4,18,192448,M. ter Stegen,26,https://cdn.sofifa.org/players/4/19/192448.png,Germany,https://cdn.sofifa.org/flags/21.png,89,92,FC Barcelona,...,69,25,13,10,87,85,88,85,90,€123.3M


In [None]:
# Find name and club for name that starts with L and ends with i 

fifa.select("Name", "Club").where(fifa.Name.startswith("L")).where(fifa.Name.endswith("i")).show(5)

+-------------+---------------+
|         Name|           Club|
+-------------+---------------+
|     L. Messi|   FC Barcelona|
|   L. Bonucci|       Juventus|
| L. Fabiański|West Ham United|
|L. Pellegrini|           Roma|
| L. Pavoletti|       Cagliari|
+-------------+---------------+
only showing top 5 rows



**Slicing**

In [None]:
# Select only first 5 columns 

col_list = fifa.columns[0:5]
df = fifa.select(col_list)

df.show()

+---+------+-----------------+---+--------------------+
|_c0|    ID|             Name|Age|               Photo|
+---+------+-----------------+---+--------------------+
|  0|158023|         L. Messi| 31|https://cdn.sofif...|
|  1| 20801|Cristiano Ronaldo| 33|https://cdn.sofif...|
|  2|190871|        Neymar Jr| 26|https://cdn.sofif...|
|  3|193080|           De Gea| 27|https://cdn.sofif...|
|  4|192985|     K. De Bruyne| 27|https://cdn.sofif...|
|  5|183277|        E. Hazard| 27|https://cdn.sofif...|
|  6|177003|        L. Modrić| 32|https://cdn.sofif...|
|  7|176580|        L. Suárez| 31|https://cdn.sofif...|
|  8|155862|     Sergio Ramos| 32|https://cdn.sofif...|
|  9|200389|         J. Oblak| 25|https://cdn.sofif...|
| 10|188545|   R. Lewandowski| 29|https://cdn.sofif...|
| 11|182521|         T. Kroos| 28|https://cdn.sofif...|
| 12|182493|         D. Godín| 32|https://cdn.sofif...|
| 13|168542|      David Silva| 32|https://cdn.sofif...|
| 14|215914|         N. Kanté| 27|https://cdn.so

**Filter**

In [None]:
fifa.filter('Overall > 50').limit(5).toPandas()

Unnamed: 0,_c0,ID,Name,Age,Photo,Nationality,Flag,Overall,Potential,Club,...,Composure,Marking,StandingTackle,SlidingTackle,GKDiving,GKHandling,GKKicking,GKPositioning,GKReflexes,Release Clause
0,0,158023,L. Messi,31,https://cdn.sofifa.org/players/4/19/158023.png,Argentina,https://cdn.sofifa.org/flags/52.png,94,94,FC Barcelona,...,96,33,28,26,6,11,15,14,8,€226.5M
1,1,20801,Cristiano Ronaldo,33,https://cdn.sofifa.org/players/4/19/20801.png,Portugal,https://cdn.sofifa.org/flags/38.png,94,94,Juventus,...,95,28,31,23,7,11,15,14,11,€127.1M
2,2,190871,Neymar Jr,26,https://cdn.sofifa.org/players/4/19/190871.png,Brazil,https://cdn.sofifa.org/flags/54.png,92,93,Paris Saint-Germain,...,94,27,24,33,9,9,15,15,11,€228.1M
3,3,193080,De Gea,27,https://cdn.sofifa.org/players/4/19/193080.png,Spain,https://cdn.sofifa.org/flags/45.png,91,93,Manchester United,...,68,15,21,13,90,85,87,88,94,€138.6M
4,4,192985,K. De Bruyne,27,https://cdn.sofifa.org/players/4/19/192985.png,Belgium,https://cdn.sofifa.org/flags/7.png,91,92,Manchester City,...,88,68,58,51,15,13,5,10,13,€196.4M


In [None]:
fifa.filter('Overall > 50').select(['Name', 'Age']).limit(5).toPandas()

Unnamed: 0,Name,Age
0,L. Messi,31
1,Cristiano Ronaldo,33
2,Neymar Jr,26
3,De Gea,27
4,K. De Bruyne,27


In [None]:
results = fifa.filter('Overall > 50').select(['Nationality', 'Name', 'Age', 'Overall']).orderBy(fifa['Overall'].desc()).collect()

In [None]:
type(results[0])

pyspark.sql.types.Row

In [None]:
print("Best Player over 50", results[0][1])
print('Worst Player over 50', results[-1][0])

Best Player over 50 L. Messi
Worst Player over 50 Ghana
