In [2]:
# Set the PySpark environment variables
import os
os.environ['SPARK_HOME'] = "/Users/apple/Spark"
os.environ['PYSPARK_DRIVER_PYTHON'] = "jupyter"
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = "lab"
os.environ['PYSPARK_PYTHON'] = "python"

In [3]:
# Import PySpark
from pyspark.sql import SparkSession

In [4]:
# Create a SparkSession
spark = SparkSession.builder.appName("PySpark-GettingStarted").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/26 17:30:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
spark

In [8]:
# To know how many cores we have for using Spark, here the output is 1 as we are running Spark locally on a single cluster
cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
cores

1

### Reading Data

In [15]:
path = '/Users/apple/Downloads/Pyspark course resources/Jupyter Notebooks and Datasets AS of 16MAY21/PySpark DataFrame Essentials/Datasets/'

In [17]:
# Reading a csv file
students_data = spark.read.csv(path+ 'students.csv',inferSchema=True, header=True)
# To view only first four rows we set limit 4 and then to view it in a Pandas like dataframe format we use toPandas() method
students_data.limit(4).toPandas()

Unnamed: 0,gender,race/ethnicity,parental level of education,lunch,test preparation course,math score,reading score,writing score
0,female,group B,bachelor's degree,standard,none,72,72,74
1,female,group C,some college,standard,completed,69,90,88
2,female,group B,master's degree,standard,none,90,95,93
3,male,group A,associate's degree,free/reduced,none,47,57,44


In [19]:
# Reading a Parquet file
users_data = spark.read.parquet(path + 'users1.parquet')

users_data.limit(5).toPandas()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 13:25:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2016-02-03 22:34:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-03 06:39:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,
3,2016-02-03 06:06:21,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,
4,2016-02-03 10:35:31,5,Carlos,Burns,cburns4@miitbeian.gov.cn,,169.113.235.40,5602256255204850.0,South Africa,,,,


In [22]:
# Finding out the number of rows in the read data
users_data.count()

1000

In [20]:
# Reading multiple parquet files which are partitioned
partitioned_parquetes = spark.read.parquet(path + 'users*') # We have three files users1.parquet, users2.parquet and users3.parquet in our local path, all will be read at once
partitioned_parquetes.limit(5).toPandas()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 13:25:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2016-02-03 22:34:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-03 06:39:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,
3,2016-02-03 06:06:21,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,
4,2016-02-03 10:35:31,5,Carlos,Burns,cburns4@miitbeian.gov.cn,,169.113.235.40,5602256255204850.0,South Africa,,,,


In [23]:
partitioned_parquetes.count()

3000

In [24]:
# To just read specific parquet files from the local
specific_parquetes = spark.read.parquet(path+'users1.parquet', path+'users1.parquet')
specific_parquetes.limit(5).toPandas()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 13:25:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2016-02-03 22:34:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-03 06:39:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,
3,2016-02-03 06:06:21,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,
4,2016-02-03 10:35:31,5,Carlos,Burns,cburns4@miitbeian.gov.cn,,169.113.235.40,5602256255204850.0,South Africa,,,,


In [25]:
specific_parquetes.count()

2000

### Validating Data

In [26]:
# To check the schema for our dataframe
students_data.printSchema()

root
 |-- gender: string (nullable = true)
 |-- race/ethnicity: string (nullable = true)
 |-- parental level of education: string (nullable = true)
 |-- lunch: string (nullable = true)
 |-- test preparation course: string (nullable = true)
 |-- math score: integer (nullable = true)
 |-- reading score: integer (nullable = true)
 |-- writing score: integer (nullable = true)



In [27]:
# To get a list of all the columns
students_data.columns

['gender',
 'race/ethnicity',
 'parental level of education',
 'lunch',
 'test preparation course',
 'math score',
 'reading score',
 'writing score']

In [29]:
# To get list of all the columns in the dataframe along with their data types we can use
students_data.describe()

DataFrame[summary: string, gender: string, race/ethnicity: string, parental level of education: string, lunch: string, test preparation course: string, math score: string, reading score: string, writing score: string]

In [30]:
# To get data type for any specific column
students_data.schema['math score'].dataType

IntegerType()

### Specifying data type before reading data based on pre-knowledge about the data

In [31]:
from pyspark.sql.types import *

In [33]:
data_schema = [ StructField("name", StringType(), True),
               StructField("email", StringType(), True),
               StructField("city", StringType(), True),
               StructField("mac", StringType(), True),
               StructField("timestamp", DateType(), True),
               StructField("creditcard", StringType(), True)
              ]
final_structure = StructType(fields=data_schema)

In [34]:
people_data = spark.read.json(path+'people.json', schema=final_structure)
people_data.limit(5).toPandas()

Unnamed: 0,name,email,city,mac,timestamp,creditcard
0,,,,,,
1,Keeley Bosco,katlyn@jenkinsmaggio.net,Lake Gladysberg,08:fd:0b:cd:77:f7,2015-04-25,1228-1221-1221-1431
2,Rubye Jerde,juvenal@johnston.name,,90:4d:fa:42:63:a2,2015-04-25,1228-1221-1221-1431
3,Miss Darian Breitenberg,,,f9:0e:d3:40:cb:e9,2015-04-25,
4,Celine Ankunding,emery_kunze@rogahn.net,,3a:af:c9:0b:5c:08,2015-04-25,1228-1221-1221-1431


In [35]:
people_data.printSchema()

root
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- city: string (nullable = true)
 |-- mac: string (nullable = true)
 |-- timestamp: date (nullable = true)
 |-- creditcard: string (nullable = true)



### Writing the files to the local system

In [48]:
# Below command stores the csv file in partitioned format, as PySpark uses Hadoop file format which requires data to be partitioned (file is stored in a folder named Written_Students.csv with some name part---)
students_data.write.mode("overwrite").csv(path + 'Written_Students.csv',header=True)

In [49]:
# To replace that by default file format, we can use a workaround as given below
from py4j.java_gateway import java_import
java_import(spark._jvm, 'org.apache.hadoop.fs.Path')

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
file = fs.globStatus(spark._jvm.Path(path + 'Written_Students.csv/part*'))[0].getPath().getName()
fs.rename(spark._jvm.Path(path + 'Written_Students.csv/'+file), spark._jvm.Path(path+'Written_Students_new.csv')) 
fs.delete(spark._jvm.Path(path+'Written_Students.csv'),True)

True

In [53]:
# To save the parquet files
partitioned_parquetes.write.mode("overwrite").parquet(path+'partitioned_parqutes_new')

In [54]:
# To save parquet file partitioned by specific columns
partitioned_parquetes.write.mode("overwrite").partitionBy('gender').parquet(path+'partitioned_parqutes_partitioned_by_gender')

### Creating a custom Spark DataFrame

In [5]:
data = [("Alice", 25), ("Bob", 28), ("Charlie", 50)]
df = spark.createDataFrame(data, ["Name","Age"])
df.show()

                                                                                

+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 28|
|Charlie| 50|
+-------+---+

