# 1-createDataFrame()
- a df is a distributed collection of data organized into named columns.
- It is equivalent to a table in a relational database.

In [8]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("read_write").getOrCreate()
spark
# some csv files, uncomment to download
#!wget -q https://sample-videos.com/csv/Sample-Spreadsheet-1000-rows.csv
#!wget -q https://www.stats.govt.nz/assets/Uploads/Business-operations-survey/Business-operations-survey-2022/Download-data/business-operations-survey-2022-information-and-communications-technology.csv

In [9]:
spark.version

'3.1.1'

In [5]:
#/content/drive/MyDrive/ff.csv
spark.version

'3.1.1'

In [None]:
import datetime
print(datetime)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pyspark.sql.functions as func

In [None]:
#type(spark)   #pyspark.sql.session.SparkSession
#dir(spark)  # 'dir' shows all the things we can do with spark.
#type(spark.createDataFrame) # a method
help(spark.createDataFrame) # use help to get info about keywords

In [None]:
!wget -q https://sample-videos.com/csv/Sample-Spreadsheet-1000-rows.csv
!mv Sample-Spreadsheet-1000-rows.csv unk.csv # renaming it

/content/unk.csv

In [None]:
# method ((1))
data =[(1,'hero',4.5),(2,'villain',5.0),(3,'cam',3.4)]
schema = ['id','job','level']
hdf = spark.createDataFrame(data=data,schema=schema)
hdf.show()
hdf.printSchema()

+---+-------+-----+
| id|    job|level|
+---+-------+-----+
|  1|   hero|  4.5|
|  2|villain|  5.0|
|  3|    cam|  3.4|
+---+-------+-----+

root
 |-- id: long (nullable = true)
 |-- job: string (nullable = true)
 |-- level: double (nullable = true)



In [None]:
# method ((2))
dat = [
      {'id':1,'name':'sai'},
      {'id':2,'name':'julia'},
      {'id':3,'name':'roani'}
]
ddf = spark.createDataFrame(data=dat)
ddf.show()
ddf.printSchema()

+---+-----+
| id| name|
+---+-----+
|  1|  sai|
|  2|julia|
|  3|roani|
+---+-----+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



# read-writing data to csv file

In [None]:
# method ((3))
# creating schema (another way)
# to use StrucType, we must import pyspark.sql.types library
mysc = StructType([\
                   StructField('userid',IntegerType(),True),
                   StructField('name',StringType(),True),
                   StructField('age',IntegerType(),True),
                   StructField('friends',StringType(),True),
                   ])

In [None]:
# reading data from a csv file
people = spark.read.format('csv')\
              .schema(mysc)\
              .option('path','/content/ff.csv').load()
#or
#peop = spark.read.csv('/content/ff.csv',header=False)


In [None]:
p = people

In [None]:
o = people.select(p.userid,p.name,p.age,p.friends)\
          .where(p.age > 30).withColumn('insertstmp',func.current_timestamp())\
          .orderBy(p.userid).cache()

In [None]:
o.show(6)

+------+------+---+-------+--------------------+
|userid|  name|age|friends|          insertstmp|
+------+------+---+-------+--------------------+
|     0|  Will| 33|    385|2023-10-14 11:00:...|
|     2|  Hugh| 55|    221|2023-10-14 11:00:...|
|     3|Deanna| 40|    465|2023-10-14 11:00:...|
|     4| Quark| 68|     21|2023-10-14 11:00:...|
|     5|Weyoun| 59|    318|2023-10-14 11:00:...|
|     6|Gowron| 37|    220|2023-10-14 11:00:...|
+------+------+---+-------+--------------------+
only showing top 6 rows



In [None]:
# now write data and creates data partition based on age for optimization
o.write.format('orc').mode('overwrite')\
       .option('path','file:///content/xxyz')\
       .partitionBy('age').save()

In [None]:
p.createOrReplaceTempView('pp')

In [None]:
spark.sql('''select * from pp where age between 65 and 69 order by friends desc''').show()

+------+--------+---+-------+
|userid|    name|age|friends|
+------+--------+---+-------+
|    62|   Keiko| 69|      9|
|   254|    Ezri| 67|     79|
|   116|     Ben| 69|     75|
|   233|  Gowron| 67|     70|
|   266|Jean-Luc| 66|    496|
|    99|   Keiko| 69|    491|
|   170|Jean-Luc| 68|    490|
|   232|    Worf| 68|    481|
|   397|   Quark| 69|    470|
|    80|   Dukat| 67|    445|
|   283|   Dukat| 65|    443|
|   159|  Kasidy| 67|    438|
|   487|   Brunt| 69|    431|
|   365|   Brunt| 65|    430|
|   428| Lwaxana| 68|    423|
|   249|   Nerys| 66|     41|
|    85|   Quark| 66|    383|
|   396|   Keiko| 67|     38|
|    97|   Nerys| 69|    361|
|   384|  Martok| 67|    355|
+------+--------+---+-------+
only showing top 20 rows



# 2-creating DataFrames
- with explicit schema
- wiht RDD
- with CSV file

In [None]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

In [None]:
# method ((3))
# creating df with explicit schema and data
edf = spark.createDataFrame([
    (1,'sai',23,'M',date(2001,3,8),26199.23),
    (2,'joyboy',19,'M',date(2000,9,9),1000.99),
    (3,'sanji',28,"M",date(1999,8,7),16000.22)
], schema = 'id long,name string,age long,gender string,dob date,bounty double')

In [None]:
edf.show()

+---+------+---+------+----------+--------+
| id|  name|age|gender|       dob|  bounty|
+---+------+---+------+----------+--------+
|  1|   sai| 23|     M|2001-03-08|26199.23|
|  2|joyboy| 19|     M|2000-09-09| 1000.99|
|  3| sanji| 28|     M|1999-08-07|16000.22|
+---+------+---+------+----------+--------+



In [None]:
edf.printSchema() # to see the schema structure

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- bounty: double (nullable = true)



In [None]:
edf.dtypes # gives cols and it's datatypes

[('id', 'int'),
 ('name', 'string'),
 ('age', 'int'),
 ('gender', 'string'),
 ('dob', 'date'),
 ('bounty', 'float')]

In [None]:
# trying to change bigint type to int and double to floattype
# using withColumn
edf = edf\
          .withColumn('id',edf['id'].cast('integer'))\
          .withColumn('age',edf['age'].cast('integer'))\
          .withColumn('bounty',edf['bounty'].cast('float'))\
          .withColumn('gender',edf['gender'].cast('boolean'))
edf.dtypes # for multiple cols and single cols


[('id', 'int'),
 ('name', 'string'),
 ('age', 'int'),
 ('gender', 'boolean'),
 ('dob', 'date'),
 ('bounty', 'float')]

In [None]:
edf.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: boolean (nullable = true)
 |-- dob: date (nullable = true)
 |-- bounty: float (nullable = true)



In [None]:
edf.select('id','name').show(2)

+---+------+
| id|  name|
+---+------+
|  1|   sai|
|  2|joyboy|
+---+------+
only showing top 2 rows



In [None]:
#- WITH RDD
rdd = spark.sparkContext.parallelize([
    (1, 4., 'GFG1', date(2000, 8, 1), datetime(2000, 8, 1, 12, 0)),
    (2, 8., 'GFG2', date(2000, 6, 2), datetime(2000, 6, 2, 12, 0)),
    (3, 5., 'GFG3', date(2000, 5, 3), datetime(2000, 5, 3, 12, 0))
])
rdf = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
rdf.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [None]:
rdf = rdf.withColumn('b',rdf['b'].cast('float'))\
         .withColumn('a',rdf['a'].cast('integer'))
rdf.printSchema()

root
 |-- a: integer (nullable = true)
 |-- b: float (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [None]:
rdf.show(2)

+---+---+----+----------+-------------------+
|  a|  b|   c|         d|                  e|
+---+---+----+----------+-------------------+
|  1|4.0|GFG1|2000-08-01|2000-08-01 12:00:00|
|  2|8.0|GFG2|2000-06-02|2000-06-02 12:00:00|
+---+---+----+----------+-------------------+
only showing top 2 rows



 - with csv file creating a DF

In [None]:
# using sampledata provided by spark itself
# file path - /content/sample_data/california_housing_train.csv

Column<'id'>

In [None]:
cdf = spark.createDataFrame(pd.read_csv('/content/sample_data/california_housing_train.csv'))
cdf.printSchema()

  for column, series in pdf.iteritems():


root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [None]:
cdf.show(2)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
only showing top 2 rows



-  another way of reading csv files and creating a DF

In [None]:
ccdf = spark.read.csv('/content/sample_data/mnist_train_small.csv',header = True)
ccdf.printSchema()

# withColumn


In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
sc = ('id integer, name string, age integer, frnds integer')

In [None]:
dff = spark.read.csv('/content/drive/MyDrive/ocsv/ff.csv',schema = sc)

In [None]:
dff.dtypes

[('id', 'int'), ('name', 'string'), ('age', 'int'), ('frnds', 'int')]

In [None]:
dff.printSchema()
dff.show(2)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- frnds: integer (nullable = true)

+---+--------+---+-----+
| id|    name|age|frnds|
+---+--------+---+-----+
|  0|    Will| 33|  385|
|  1|Jean-Luc| 26|    2|
+---+--------+---+-----+
only showing top 2 rows



In [None]:
# col renaming
df1 = dff.withColumnRenamed('frnds','fcirlce')

In [None]:
df1.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- fcirlce: integer (nullable = true)



In [None]:
# adding a new column with hardcoded data into cols
from pyspark.sql.functions import lit # lit is must
dfu = df1.withColumn('state',lit("AP"))
dfu.show(5)

+---+--------+---+-------+-----+
| id|    name|age|fcirlce|state|
+---+--------+---+-------+-----+
|  0|    Will| 33|    385|   AP|
|  1|Jean-Luc| 26|      2|   AP|
|  2|    Hugh| 55|    221|   AP|
|  3|  Deanna| 40|    465|   AP|
|  4|   Quark| 68|     21|   AP|
+---+--------+---+-------+-----+
only showing top 5 rows



In [None]:
# creating a copied col with new data
dfc = df1.withColumn('age_af_3yrs',col('age')+ 3)

In [None]:
print('after')
dfc.show(2)

after
+---+--------+---+-------+-----------+
| id|    name|age|fcirlce|age_af_3yrs|
+---+--------+---+-------+-----------+
|  0|    Will| 33|    385|         36|
|  1|Jean-Luc| 26|      2|         29|
+---+--------+---+-------+-----------+
only showing top 2 rows



In [None]:
#now dropping that ageaf3yrs col
dfc = dfc.drop('age_af_3yrs')
dfc.show(3)

+---+--------+---+-------+
| id|    name|age|fcirlce|
+---+--------+---+-------+
|  0|    Will| 33|    385|
|  1|Jean-Luc| 26|      2|
|  2|    Hugh| 55|    221|
+---+--------+---+-------+
only showing top 3 rows



# expression withColumn

In [None]:
dfc.show(5)

+---+--------+---+-------+
| id|    name|age|fcirlce|
+---+--------+---+-------+
|  0|    Will| 33|    385|
|  1|Jean-Luc| 26|      2|
|  2|    Hugh| 55|    221|
|  3|  Deanna| 40|    465|
|  4|   Quark| 68|     21|
+---+--------+---+-------+
only showing top 5 rows



In [None]:
dfj =dfc.withColumn('age*fcircle',expr('age * 2'))
dfj.show(5)

In [None]:
dfc = dfc.withColumn('bg',when(col('fcircle') >= 150))

- reading a json file

In [None]:
jdf = spark.read.json('/content/sample_data/anscombe.json',multiLine=True)
jdf.show(6)
jdf.printSchema()

In [None]:
jdf.withColumn('both')

# practice

In [None]:
# reading a csv
# some csv files, uncomment to download
#!wget -q https://sample-videos.com/csv/Sample-Spreadsheet-1000-rows.csv
#!wget -q https://www.stats.govt.nz/assets/Uploads/Business-operations-survey/Business-operations-survey-2022/Download-data/business-operations-survey-2022-information-and-communications-technology.csv

mysc = StructType([\
                   StructField('id',IntegerType(),True),
                   StructField('product',StringType(),True),
                   StructField('uname',StringType(),True),
                   StructField('price',IntegerType(),True),
                   StructField('raw_price',FloatType(),True),
                   StructField('tax',FloatType(),True),
                   StructField('city',StringType(),True),
                   StructField('product_type',StringType(),True),
                   StructField('sar_value',DoubleType(),True),
                   ])

In [None]:
#mschema = schema = ['id','products','user','price','raw price','tax','city','type','sar value']

sdf = spark.read.csv('/content/unk.csv',schema=mysc,header=False)
sdf.show(n=5,truncate=False)
sdf.printSchema()

+---+------------------------------------------------------+------------------+-----+---------+------+-----+------------+---------+
|id |product                                               |uname             |price|raw_price|tax   |city |product_type|sar_value|
+---+------------------------------------------------------+------------------+-----+---------+------+-----+------------+---------+
|1  |Eldon Base for stackable storage shelf, platinum      |Muhammed MacIntyre|3    |-213.25  |38.94 |35   |Nunavut     |null     |
|2  |"1.7 Cubic Foot Compact ""Cube"" Office Refrigerators"|Barry French      |293  |457.81   |208.16|68.02|Nunavut     |null     |
|3  |Cardinal Slant-D� Ring Binder, Heavy Gauge Vinyl      |Barry French      |293  |46.71    |8.69  |2.99 |Nunavut     |null     |
|4  |R380                                                  |Clay Rozendal     |483  |1198.97  |195.99|3.99 |Nunavut     |null     |
|5  |Holmes HEPA Air Purifier                              |Carlos Soltero  

In [None]:
dd=sdf.drop('sar_value')

In [None]:
dd.printSchema()

In [None]:
dd = dd.withColumn('non_taxable',when(dd['raw_price'] < 250, "NO tax").otherwise('TAXABLE'))
dd.show(n=8,truncate=False)

+---+------------------------------------------------------+------------------+-----+---------+------+-----+------------+-----------+
|id |product                                               |uname             |price|raw_price|tax   |city |product_type|non_taxable|
+---+------------------------------------------------------+------------------+-----+---------+------+-----+------------+-----------+
|1  |Eldon Base for stackable storage shelf, platinum      |Muhammed MacIntyre|3    |-213.25  |38.94 |35   |Nunavut     |NO tax     |
|2  |"1.7 Cubic Foot Compact ""Cube"" Office Refrigerators"|Barry French      |293  |457.81   |208.16|68.02|Nunavut     |TAXABLE    |
|3  |Cardinal Slant-D� Ring Binder, Heavy Gauge Vinyl      |Barry French      |293  |46.71    |8.69  |2.99 |Nunavut     |NO tax     |
|4  |R380                                                  |Clay Rozendal     |483  |1198.97  |195.99|3.99 |Nunavut     |TAXABLE    |
|5  |Holmes HEPA Air Purifier                              |Ca

In [None]:
dd = dd.drop('city','product_type')

In [None]:
ds = dd.withColumn('wholeprice',dd['price'] + dd['raw_price'])
ds.show(5)

+---+--------------------+------------------+-----+---------+------+-----------+----------+
| id|             product|             uname|price|raw_price|   tax|non_taxable|wholeprice|
+---+--------------------+------------------+-----+---------+------+-----------+----------+
|  1|Eldon Base for st...|Muhammed MacIntyre|    3|  -213.25| 38.94|     NO tax|   -210.25|
|  2|"1.7 Cubic Foot C...|      Barry French|  293|   457.81|208.16|    TAXABLE|    750.81|
|  3|Cardinal Slant-D�...|      Barry French|  293|    46.71|  8.69|     NO tax|    339.71|
|  4|                R380|     Clay Rozendal|  483|  1198.97|195.99|    TAXABLE|   1681.97|
|  5|Holmes HEPA Air P...|    Carlos Soltero|  515|    30.94| 21.78|     NO tax|    545.94|
+---+--------------------+------------------+-----+---------+------+-----------+----------+
only showing top 5 rows



In [None]:
# removing negative raw_price from df
ds = ds.filter(col('raw_price') >= 0)
ds.count()
ds.show(4)

+---+--------------------+--------------+-----+---------+------+-----------+----------+
| id|             product|         uname|price|raw_price|   tax|non_taxable|wholeprice|
+---+--------------------+--------------+-----+---------+------+-----------+----------+
|  2|"1.7 Cubic Foot C...|  Barry French|  293|   457.81|208.16|    TAXABLE|    750.81|
|  3|Cardinal Slant-D�...|  Barry French|  293|    46.71|  8.69|     NO tax|    339.71|
|  4|                R380| Clay Rozendal|  483|  1198.97|195.99|    TAXABLE|   1681.97|
|  5|Holmes HEPA Air P...|Carlos Soltero|  515|    30.94| 21.78|     NO tax|    545.94|
+---+--------------------+--------------+-----+---------+------+-----------+----------+
only showing top 4 rows



In [None]:
# adding new tax col to df
ds = ds.withColumn('newTax',\
                   when(col('raw_price') > 200, col('raw_price') * 1.2)\
                   .when(col('raw_price') > 120, col('raw_price') * 0.8)\
                   .when(col('raw_price') > 50,col('raw_price') * 0.2)\
                   .otherwise(col('raw_price')))
ds.show(10)

+---+--------------------+----------------+-----+---------+------+-----------+----------+------------------+
| id|             product|           uname|price|raw_price|   tax|non_taxable|wholeprice|            newTax|
+---+--------------------+----------------+-----+---------+------+-----------+----------+------------------+
|  2|"1.7 Cubic Foot C...|    Barry French|  293|   457.81|208.16|    TAXABLE|    750.81| 549.3719970703124|
|  3|Cardinal Slant-D�...|    Barry French|  293|    46.71|  8.69|     NO tax|    339.71|46.709999084472656|
|  4|                R380|   Clay Rozendal|  483|  1198.97|195.99|    TAXABLE|   1681.97|  1438.76396484375|
|  5|Holmes HEPA Air P...|  Carlos Soltero|  515|    30.94| 21.78|     NO tax|    545.94|30.940000534057617|
|  6|G.E. Longer-Life ...|  Carlos Soltero|  515|     4.43|  6.64|     NO tax|    519.43| 4.429999828338623|
|  8|SAFCO Mobile Desk...|    Carl Jackson|  613|    127.7| 42.76|     NO tax|     740.7|102.15999755859376|
| 13|Holmes HEPA Ai

In [None]:
ds = ds.drop('price','tax')

In [None]:
daa = ds.withColumn('newTax',substring(col('newTax'),1,8))
daa.show(5)

+---+--------------------+--------------+---------+-----------+----------+--------+
| id|             product|         uname|raw_price|non_taxable|wholeprice|  newTax|
+---+--------------------+--------------+---------+-----------+----------+--------+
|  2|"1.7 Cubic Foot C...|  Barry French|   457.81|    TAXABLE|    750.81|549.3719|
|  3|Cardinal Slant-D�...|  Barry French|    46.71|     NO tax|    339.71|46.70999|
|  4|                R380| Clay Rozendal|  1198.97|    TAXABLE|   1681.97|1438.763|
|  5|Holmes HEPA Air P...|Carlos Soltero|    30.94|     NO tax|    545.94|30.94000|
|  6|G.E. Longer-Life ...|Carlos Soltero|     4.43|     NO tax|    519.43|4.429999|
+---+--------------------+--------------+---------+-----------+----------+--------+
only showing top 5 rows



In [None]:
# pattern finding with 'like'

# user defined StructType

In [None]:
# creating user defined to structtype to store
# name ->(fname,lname)
data = [(1,('roronoa','zoro'),100000),
        (2,('monkey D','Luffy'),400000)]

#name structtype
StructName=StructType([
                       StructField('fname',StringType()),
                       StructField('lname',StringType())])
schema = StructType([
                      StructField('id',IntegerType()),
                      StructField('name',StructName),
                      StructField('bounty',IntegerType())
])

In [None]:
usdf=spark.createDataFrame(data=data,schema=schema)
usdf.show()
usdf.printSchema()

+---+-----------------+------+
| id|             name|bounty|
+---+-----------------+------+
|  1|  {roronoa, zoro}|100000|
|  2|{monkey D, Luffy}|400000|
+---+-----------------+------+

root
 |-- id: integer (nullable = true)
 |-- name: struct (nullable = true)
 |    |-- fname: string (nullable = true)
 |    |-- lname: string (nullable = true)
 |-- bounty: integer (nullable = true)



In [None]:
print(200)

200


# filter() function
- In PySpark, you can use the filter() function to filter rows from a DataFrame based on a given condition.
- The condition is defined using various functions and operators available in PySpark.

In [None]:
fdf = spark.read.csv('/content/od.csv',header=True)
fdf.show(3)
fdf.printSchema()

+---+-----+-----+---+--------+-------------------+
| id| name|float|age|    time|          timestamp|
+---+-----+-----+---+--------+-------------------+
|  1| John|23.45| 30|12:45:00|2023-10-13 08:30:45|
|  2|Alice|45.67| 25|09:15:00|2023-10-12 14:20:30|
|  3|  Bob|12.34| 35|15:30:00|2023-10-11 20:15:00|
+---+-----+-----+---+--------+-------------------+
only showing top 3 rows

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- float: string (nullable = true)
 |-- age: string (nullable = true)
 |-- time: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [None]:
fdf=fdf.withColumn('id',fdf['id'].cast('integer'))\
       .withColumn('age',fdf['age'].cast('integer'))\
       .withColumn('live_age',fdf['float'].cast('float'))\
       .withColumn('timestamp',fdf['timestamp'].cast('timestamp'))
fdf.printSchema() #couldn't change time datatype since time hasn't have a datatype

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- float: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- time: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- live_age: float (nullable = true)



In [None]:
fdf.show(5)

+---+-----+-----+---+--------+-------------------+--------+
| id| name|float|age|    time|          timestamp|live_age|
+---+-----+-----+---+--------+-------------------+--------+
|  1| John|23.45| 30|12:45:00|2023-10-13 08:30:45|   23.45|
|  2|Alice|45.67| 25|09:15:00|2023-10-12 14:20:30|   45.67|
|  3|  Bob|12.34| 35|15:30:00|2023-10-11 20:15:00|   12.34|
|  4| Ella|56.78| 42|14:20:00|2023-10-10 10:45:15|   56.78|
|  5|David|34.56| 28|11:05:00|2023-10-09 17:10:45|   34.56|
+---+-----+-----+---+--------+-------------------+--------+
only showing top 5 rows



In [None]:
fdf = fdf.drop('float','time')

In [None]:
fdf.show(3)

+---+-----+---+-------------------+--------+
| id| name|age|          timestamp|live_age|
+---+-----+---+-------------------+--------+
|  1| John| 30|2023-10-13 08:30:45|   23.45|
|  2|Alice| 25|2023-10-12 14:20:30|   45.67|
|  3|  Bob| 35|2023-10-11 20:15:00|   12.34|
+---+-----+---+-------------------+--------+
only showing top 3 rows



In [None]:
# filter -> age >=35 AND timestamp < '2023-10-11 20:15:00'
age_df = fdf.filter((fdf['age'] >= 35) & (fdf['timestamp'] < '2023-10-11 20:15:00'))
age_df.show()

+---+-------+---+-------------------+--------+
| id|   name|age|          timestamp|live_age|
+---+-------+---+-------------------+--------+
|  4|   Ella| 42|2023-10-10 10:45:15|   56.78|
|  6|  Sarah| 55|2023-10-08 12:05:30|    78.9|
|  8| Olivia| 39|2023-10-06 16:40:45|   67.43|
| 12|William| 48|2023-10-02 09:20:30|   42.12|
| 14|  James| 41|2023-09-30 21:10:45|   50.98|
| 16| Samuel| 36|2023-09-28 07:40:00|    61.8|
| 18| Oliver| 45|2023-09-26 19:35:45|   53.76|
+---+-------+---+-------------------+--------+



In [None]:
# now filtering with string operators
ssdf = fdf.filter(col('name').contains('li'))
ssdf.show()

+---+-------+---+-------------------+--------+
| id|   name|age|          timestamp|live_age|
+---+-------+---+-------------------+--------+
|  2|  Alice| 25|2023-10-12 14:20:30|   45.67|
|  8| Olivia| 39|2023-10-06 16:40:45|   67.43|
| 12|William| 48|2023-10-02 09:20:30|   42.12|
| 18| Oliver| 45|2023-09-26 19:35:45|   53.76|
+---+-------+---+-------------------+--------+



In [None]:
# substring
ssdf = ssdf.withColumn('odate',substring(ssdf['timestamp'],1,11))
ssdf.show()

+---+-------+---+-------------------+--------+-----------+
| id|   name|age|          timestamp|live_age|      odate|
+---+-------+---+-------------------+--------+-----------+
|  2|  Alice| 25|2023-10-12 14:20:30|   45.67|2023-10-12 |
|  8| Olivia| 39|2023-10-06 16:40:45|   67.43|2023-10-06 |
| 12|William| 48|2023-10-02 09:20:30|   42.12|2023-10-02 |
| 18| Oliver| 45|2023-09-26 19:35:45|   53.76|2023-09-26 |
+---+-------+---+-------------------+--------+-----------+



In [None]:
ssdf = ssdf.withColumn('odate',ssdf['odate'].cast('date'))
ssdf.printSchema()
# now odate's datatype is 'date'

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- live_age: float (nullable = true)
 |-- odate: date (nullable = true)



In [None]:
ssdf.select('id','name','age').show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  2|  Alice| 25|
|  8| Olivia| 39|
| 12|William| 48|
| 18| Oliver| 45|
+---+-------+---+



In [None]:
# using like and filter
fdf = fdf.filter(col('name').like('%vi%'))
fdf.show()

+---+------+---+-------------------+--------+
| id|  name|age|          timestamp|live_age|
+---+------+---+-------------------+--------+
|  5| David| 28|2023-10-09 17:10:45|   34.56|
|  8|Olivia| 39|2023-10-06 16:40:45|   67.43|
+---+------+---+-------------------+--------+



In [None]:
# another way of like and filter
a_df = age_df.filter(age_df.name.like('%e%'))
#a_df = age_df.filter(age_df['name'].like("%ce%")) both works fine
a_df.show()

+---+------+---+-------------------+--------+
| id|  name|age|          timestamp|live_age|
+---+------+---+-------------------+--------+
| 14| James| 41|2023-09-30 21:10:45|   50.98|
| 16|Samuel| 36|2023-09-28 07:40:00|    61.8|
| 18|Oliver| 45|2023-09-26 19:35:45|   53.76|
+---+------+---+-------------------+--------+



# groupBy()
-The groupBy() transformation in PySpark is used to group rows of a DataFrame based on one or more columns, allowing you to perform various aggregation and summary operations on each group.

In [None]:
!wget -q https://github.com/datablist/sample-csv-files/raw/main/files/people/people-100.csv
!mv people-100.csv people.csv

In [None]:
#gdf = spark.read.csv('/content/people.csv',header=True)
gdf.show(2,truncate=False)

+-----+---------------+----------+---------+------+---------------------+----------------------+-------------+---------------+
|Index|User Id        |First Name|Last Name|Sex   |Email                |Phone                 |Date of birth|Job Title      |
+-----+---------------+----------+---------+------+---------------------+----------------------+-------------+---------------+
|1    |88F7B33d2bcf9f5|Shelby    |Terrell  |Male  |elijah57@example.net |001-084-906-7849x73518|1945-10-26   |Games developer|
|2    |f90cD3E76f1A9b9|Phillip   |Summers  |Female|bethany14@example.com|214.112.6044x4913     |1910-03-24   |Phytotherapist |
+-----+---------------+----------+---------+------+---------------------+----------------------+-------------+---------------+
only showing top 2 rows



In [None]:
gdf = gdf.withColumn('index',gdf['index'].cast('integer'))\
         .withColumn('date of birth',gdf['date of birth'].cast('date'))
gdf.printSchema()

root
 |-- index: integer (nullable = true)
 |-- User Id: string (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Last Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- date of birth: date (nullable = true)
 |-- Job Title: string (nullable = true)



In [None]:
# applying aggregation funcs to groups of rows, such as sum, count,avg,min,max

godf = gdf.groupBy('sex')
co = godf.count()
co.show() # returns how many male and females


+------+-----+
|   sex|count|
+------+-----+
|Female|   53|
|  Male|   47|
+------+-----+



In [None]:
# !mv /content/drive/MyDrive/ocsv/original.csv /content/ori.csv

In [None]:
dodf = gdf.groupBy('date of birth')
dateof = dodf.count()
dateof.show() #no one has same dob :)

In [None]:
#sum
sumdf = gdf.groupBy('job title')
samejobs = sumdf.sum('index') # sums the index
samejobs.show()

+--------------------+----------+
|           job title|sum(index)|
+--------------------+----------+
|English as a seco...|        72|
|  Petroleum engineer|        60|
|Conservator, furn...|        92|
|          Counsellor|        27|
|      Hydrogeologist|         8|
|      Police officer|        98|
|IT sales professi...|       100|
|Education officer...|        50|
|Accountant, chart...|        84|
|Accounting techni...|        42|
|   Recycling officer|        28|
|     Games developer|         1|
|                Make|        44|
|     Physiotherapist|        52|
|       Archaeologist|        58|
|Scientist, clinic...|        54|
|        Neurosurgeon|        63|
|Teacher, adult ed...|        19|
|             Curator|        61|
|              Lawyer|         9|
+--------------------+----------+
only showing top 20 rows



In [None]:
# groupby multiple cols
mgdf = gdf.groupBy('last name')
cc = mgdf.count()
cc.show()

+----------+-----+
| last name|count|
+----------+-----+
|      Rich|    1|
|    Conway|    1|
|     Rocha|    1|
|     Brady|    1|
|   Watkins|    1|
|    Levine|    1|
|   Sweeney|    1|
|   Farrell|    1|
|   Schmidt|    1|
|      Hull|    1|
|      Pugh|    1|
|Cunningham|    1|
|    Barnes|    2|
|     Nixon|    1|
|     Hardy|    1|
|    Knight|    1|
|    Briggs|    1|
|     Haney|    1|
|    Norton|    1|
|   Bernard|    1|
+----------+-----+
only showing top 20 rows



# groupBy() - part 2

In [None]:
g2df = spark.read.csv('ori.csv',header=True)
g2df.printSchema()
g2df.show(n=5,truncate=False)

root
 |-- id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- City: string (nullable = true)
 |-- JobTitle: string (nullable = true)
 |-- Salary: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

+---+----------+----------+------+-------------+----------------------------+---------+----------+-----------+
|id |first_name|last_name |gender|City         |JobTitle                    |Salary   |Latitude  |Longitude  |
+---+----------+----------+------+-------------+----------------------------+---------+----------+-----------+
|1  |Melinde   |Shilburne |Female|Nowa Ruda    |Assistant Professor         |$57438.18|50.5774075|16.4967184 |
|2  |Kimberly  |Von Welden|Female|Bulgan       |Programmer II               |$62846.60|48.8231572|103.5218199|
|3  |Alvera    |Di Boldi  |Female|null         |null                        |$57576.52|39

In [None]:
subs = g2df.withColumn('salary',substring(g2df['salary'], 2,9))
subs.show() # removing $ symbol from salary col

+---+----------+----------+------+---------------+--------------------+--------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|  salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+--------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|              

In [None]:
wrr = subs.write.csv('noriginal.csv')
# creating new csv and writing clean data into it

In [None]:
# coping and renaming new csv file into drive/ocsv folder
!cp /content/noriginal.csv/part-00000-53416394-42cc-4ba9-a847-d9cc38bfbdc3-c000.csv /content/drive/MyDrive/ocsv/noriginal.csv


In [None]:
# reading csv file from drive and creating df
subs = spark.read.csv('/content/drive/MyDrive/ocsv/noriginal.csv',header=True)
subs.printSchema()

root
 |-- id: string (nullable = true)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- city: string (nullable = true)
 |-- jobtitle: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [None]:
subs = subs.withColumn('id',subs['id'].cast('integer'))\
           .withColumn('salary',subs['salary'].cast("float"))\
           .withColumn('latitude',subs['latitude'].cast("float"))\
           .withColumn('longitude',subs['longitude'].cast("float"))
subs.dtypes

[('id', 'int'),
 ('fname', 'string'),
 ('lname', 'string'),
 ('gender', 'string'),
 ('city', 'string'),
 ('jobtitle', 'string'),
 ('salary', 'float'),
 ('latitude', 'float'),
 ('longitude', 'float')]

In [None]:
# # now using 'subs' df to work on groupBy()
gsubs = subs.groupBy('jobtitle')
gsum = gsubs.sum('salary')
gsum.show(truncate=False) # sum of all their salaries grouped by jobtitles

+-----------------------------+----------------+
|jobtitle                     |sum(salary)     |
+-----------------------------+----------------+
|Systems Administrator II     |264525.69921875 |
|Media Manager III            |140905.0703125  |
|Recruiting Manager           |367391.685546875|
|Geologist III                |133739.40234375 |
|Geologist II                 |86587.73046875  |
|Database Administrator IV    |52018.4609375   |
|Financial Analyst            |629598.33203125 |
|Analyst Programmer           |374490.921875   |
|Software Engineer II         |74782.640625    |
|Accountant IV                |165464.49609375 |
|Product Engineer             |622904.2734375  |
|Software Test Engineer II    |356046.427734375|
|Safety Technician III        |29421.529296875 |
|Junior Executive             |391575.3046875  |
|Systems Administrator III    |154118.4375     |
|Human Resources Assistant III|162527.759765625|
|VP Marketing                 |364954.279296875|
|Environmental Tech 

In [None]:
gcount = gsubs.count()
gcount.show()

In [None]:
# finding avg sal of each jobtitle
gavg = gsubs.avg('salary')
gavg.show()

In [None]:
#avg,max and min salary of each jobtitle
masal = gsubs.max('salary')
masal.show(truncate=False)
#Systems Administrator II     94859.08

In [None]:
fsubs = subs.filter((subs['jobtitle'] == 'Systems Administrator II'))
fsubs.show(truncate=False)

+---+----------+-----------+------+--------------+------------------------+--------+---------+-----------+
|id |first_name|last_name  |gender|City          |JobTitle                |salary  |latitude |longitude  |
+---+----------+-----------+------+--------------+------------------------+--------+---------+-----------+
|185|Lezlie    |Tregea     |Female|El Salitre    |Systems Administrator II|94859.08|20.098057|-101.502785|
|881|Frieda    |Castelluzzi|Female|Calebasses    |Systems Administrator II|77236.36|-20.11743|57.556107  |
|956|Kinny     |Salmen     |Male  |Cergy-Pontoise|Systems Administrator II|62163.8 |49.039078|2.074954   |
|961|Faun      |Jamieson   |Female|Bestala       |Systems Administrator II|30266.46|-8.245185|114.97245  |
+---+----------+-----------+------+--------------+------------------------+--------+---------+-----------+



In [None]:
# min salary
minsal = gsubs.min('salary')
minsal.show(truncate=False) # works fine

In [None]:
# now finding all in 1 step
gaggdf = gsubs.agg(max('salary'),min('salary'),avg('salary'),mean('salary'),sum('salary'))
gaggdf.show()

+--------------------+-----------+-----------+------------------+------------------+----------------+
|            jobtitle|max(salary)|min(salary)|       avg(salary)|       avg(salary)|     sum(salary)|
+--------------------+-----------+-----------+------------------+------------------+----------------+
|Systems Administr...|   94859.08|   30266.46|  66131.4248046875|  66131.4248046875| 264525.69921875|
|   Media Manager III|   70306.29|   18453.02|46968.356770833336|46968.356770833336|  140905.0703125|
|  Recruiting Manager|   97121.81|   31690.72|61231.947591145836|61231.947591145836|367391.685546875|
|       Geologist III|   95247.14|   17134.96|    44579.80078125|    44579.80078125| 133739.40234375|
|        Geologist II|   47990.34|   38597.39|   43293.865234375|   43293.865234375|  86587.73046875|
|Database Administ...|   52018.46|   52018.46|     52018.4609375|     52018.4609375|   52018.4609375|
|   Financial Analyst|   93817.51|   26980.55|   62959.833203125|   62959.83320312

In [None]:
subs.show(5,truncate=False)

+---+--------+----------+------+-------------+----------------------------+--------+---------+----------+
|id |fname   |lname     |gender|city         |jobtitle                    |salary  |latitude |longitude |
+---+--------+----------+------+-------------+----------------------------+--------+---------+----------+
|1  |Melinde |Shilburne |Female|Nowa Ruda    |Assistant Professor         |57438.18|50.577408|16.496717 |
|2  |Kimberly|Von Welden|Female|Bulgan       |Programmer II               |62846.6 |48.82316 |103.52182 |
|3  |Alvera  |Di Boldi  |Female|null         |null                        |57576.52|39.994747|116.339775|
|4  |Shannon |O'Griffin |Male  |Divnomorskoye|Budget/Accounting Analyst II|61489.23|44.504723|38.130016 |
|5  |Sherwood|Macieja   |Male  |Mytishchi    |VP Sales                    |63863.09|null     |37.648994 |
+---+--------+----------+------+-------------+----------------------------+--------+---------+----------+
only showing top 5 rows



In [None]:
fsubs = subs.groupBy('jobtitle')
sor = fsubs.count()
sss = sor.sort('count',ascending=False)
sss.show()

+--------------------+-----+
|            jobtitle|count|
+--------------------+-----+
|Programmer Analyst I|    1|
|Research Assistant I|    1|
|Account Represent...|    1|
|Database Administ...|    1|
|         Engineer II|    1|
|    Health Coach III|    1|
|Budget/Accounting...|    1|
|     Media Manager I|    1|
|        Geologist IV|    1|
|Office Assistant III|    1|
|Programmer Analys...|    1|
|Computer Systems ...|    1|
|Safety Technician...|    1|
|Safety Technician II|    1|
|Database Administ...|    1|
|Database Administ...|    1|
| Safety Technician I|    1|
|Software Test Eng...|    1|
|      Statistician I|    1|
|Software Engineer II|    1|
+--------------------+-----+
only showing top 20 rows



In [None]:
# sorting a df based on salary range
sortdf = subs.sort('salary',ascending=True)
sortdf.show()

In [None]:
#using alias()
fullname = subs.select(col('city').alias('ncity'))
fullname.show()

+---------------+
|          ncity|
+---------------+
|      Nowa Ruda|
|         Bulgan|
|           null|
|  Divnomorskoye|
|      Mytishchi|
|Kinsealy-Drinan|
|         Dachun|
|      Trélissac|
|         Heitan|
|       Arbeláez|
|       El Cardo|
|    Wangqingtuo|
|      Sułkowice|
|    Springfield|
|         Chrást|
|         Xijiao|
|      Mieścisko|
| Foros do Trapo|
|    Jabungsisir|
|          Pedra|
+---------------+
only showing top 20 rows



# YT df.na.drop()

In [None]:
ytdf = spark.read.csv('/content/drive/MyDrive/ocsv/nall.csv',header=True,inferSchema=True)
ytdf.printSchema() #inferSchema

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- gender: string (nullable = true)



In [None]:
ytdf.show()

+---+-------+----------+------+------+
| id|   name|department|salary|gender|
+---+-------+----------+------+------+
|  1|   John|       Doe| 50000|  Male|
|  2|   Jane|     Smith|  null|Female|
|  3| Robert|      null| 65000|  null|
|  4|   Mary|   Johnson| 56000|Female|
|  5|Michael|      null| 75000|  Male|
|  6|  Susan|      null| 50000|Female|
|  7|  David|    Miller|  null|  Male|
|  8|  Linda|  Williams| 72000|  null|
|  9|William|  Anderson|  null|  Male|
| 10|  Emily|      null| 55000|Female|
+---+-------+----------+------+------+



In [None]:
# na.drop(how='any') -> If 'any', drop a row if it contains any nulls.
# na.drop(how='all')If 'all', drop a row only if all its values are null.
# thresh: int, optional
nadf=ytdf.na.drop(how='any')
nadf.show()

+---+----+----------+------+------+
| id|name|department|salary|gender|
+---+----+----------+------+------+
|  1|John|       Doe| 50000|  Male|
|  4|Mary|   Johnson| 56000|Female|
+---+----+----------+------+------+



In [None]:
nalldf = ytdf.na.drop(how='all')
nalldf.show() #drops only if all row vals are null otherwise not effect

+---+-------+----------+------+------+
| id|   name|department|salary|gender|
+---+-------+----------+------+------+
|  1|   John|       Doe| 50000|  Male|
|  2|   Jane|     Smith|  null|Female|
|  3| Robert|      null| 65000|  null|
|  4|   Mary|   Johnson| 56000|Female|
|  5|Michael|      null| 75000|  Male|
|  6|  Susan|      null| 50000|Female|
|  7|  David|    Miller|  null|  Male|
|  8|  Linda|  Williams| 72000|  null|
|  9|William|  Anderson|  null|  Male|
| 10|  Emily|      null| 55000|Female|
+---+-------+----------+------+------+



In [None]:
nallth=nalldf.na.drop(how='any',thresh=3)
nallth.show()

+---+-------+----------+------+------+
| id|   name|department|salary|gender|
+---+-------+----------+------+------+
|  1|   John|       Doe| 50000|  Male|
|  2|   Jane|     Smith|  null|Female|
|  3| Robert|      null| 65000|  null|
|  4|   Mary|   Johnson| 56000|Female|
|  5|Michael|      null| 75000|  Male|
|  6|  Susan|      null| 50000|Female|
|  7|  David|    Miller|  null|  Male|
|  8|  Linda|  Williams| 72000|  null|
|  9|William|  Anderson|  null|  Male|
| 10|  Emily|      null| 55000|Female|
+---+-------+----------+------+------+



In [None]:
#subset takes col name input and dels that row when col has null value
nadsubs = ytdf.na.drop(how='any',subset=['department'])
nadsubs.show() # returns null-less dept col

+---+-------+----------+------+------+
| id|   name|department|salary|gender|
+---+-------+----------+------+------+
|  1|   John|       Doe| 50000|  Male|
|  2|   Jane|     Smith|  null|Female|
|  4|   Mary|   Johnson| 56000|Female|
|  7|  David|    Miller|  null|  Male|
|  8|  Linda|  Williams| 72000|  null|
|  9|William|  Anderson|  null|  Male|
+---+-------+----------+------+------+



In [None]:
# filling the null vals at all cols & rows
alnulldf = ytdf.na.fill('no-null')
alnulldf.show()

+---+-------+----------+------+-------+
| id|   name|department|salary| gender|
+---+-------+----------+------+-------+
|  1|   John|       Doe| 50000|   Male|
|  2|   Jane|     Smith|  null| Female|
|  3| Robert|   no-null| 65000|no-null|
|  4|   Mary|   Johnson| 56000| Female|
|  5|Michael|   no-null| 75000|   Male|
|  6|  Susan|   no-null| 50000| Female|
|  7|  David|    Miller|  null|   Male|
|  8|  Linda|  Williams| 72000|no-null|
|  9|William|  Anderson|  null|   Male|
| 10|  Emily|   no-null| 55000| Female|
+---+-------+----------+------+-------+



In [None]:
# fill at specific cols
spenuldf = ytdf.na.fill('NA',['gender'])
spenuldf.show() # fills NA at gender nulls

+---+-------+----------+------+------+
| id|   name|department|salary|gender|
+---+-------+----------+------+------+
|  1|   John|       Doe| 50000|  Male|
|  2|   Jane|     Smith|  null|Female|
|  3| Robert|      null| 65000|    NA|
|  4|   Mary|   Johnson| 56000|Female|
|  5|Michael|      null| 75000|  Male|
|  6|  Susan|      null| 50000|Female|
|  7|  David|    Miller|  null|  Male|
|  8|  Linda|  Williams| 72000|    NA|
|  9|William|  Anderson|  null|  Male|
| 10|  Emily|      null| 55000|Female|
+---+-------+----------+------+------+

