In [0]:
from pyspark.sql.types import *

# Read multiline json file 1
data_df = spark.read.option("multiline","false") \
      .json("/FileStore/tables/JSON/examples.json")
display(data_df) 

_id,calories_per_serving,cook_time,desc,directions,ingredients,likes,likes_count,prep_time,rating,rating_avg,servings,tags,title,type
List(5ee69d943260aab97ea0d58d),,,,,,,,,,,,,Pizza,
List(5ee69e393260aab97ea0d58e),,,,,,,,,,,,,Delete me,
List(5e5e9c470d33e9e8e3891b35),210.0,20.0,Classic Mexican tacos,"List(Brown beef, Add taco seasoning and water, mix, Bring to boil, Lower heat to simmer 5-10 minutes until desired consistency, Put meat in tacos)","List(List(ground beef (lean), List(1, lbs)), List(taco seasoning, List(2, oz)), List(corn hard tacos, List(12, oz)))","List(1, 415)",2.0,10.0,"List(4, 4, 3, 4, 2, 5, 2, 2, 4, 5)",3.5,4.0,"List(mexican, quick, easy, ground beef)",Tacos,Dinner


In [0]:
# directions columns contains Array lets try to flat it out...

#Using SQL col() function
from pyspark.sql.functions import col
data_df2= data_df["title","directions"].filter(col("title")=="Tacos")

display(data_df2)

from pyspark.sql.functions import explode
display(  data_df2.select(data_df2.title,explode(data_df2.directions))  ) # explode function used for flat it out...

data_df3=data_df2.select(data_df2.title,explode(data_df2.directions))

title,directions
Tacos,"List(Brown beef, Add taco seasoning and water, mix, Bring to boil, Lower heat to simmer 5-10 minutes until desired consistency, Put meat in tacos)"


title,col
Tacos,Brown beef
Tacos,"Add taco seasoning and water, mix"
Tacos,Bring to boil
Tacos,Lower heat to simmer 5-10 minutes until desired consistency
Tacos,Put meat in tacos


In [0]:
#Use array() function to create a new array column by merging the data from multiple columns.

from pyspark.sql.functions import array

display( data_df3.select(data_df3.title,array(data_df3.title,data_df3.col).alias("Dummy_array")) )

#display( data_df3.select(data_df3.title,array(data_df3.title,data_df3.col)[1].alias("Dummy_array")) )


title,Dummy_array
Tacos,"List(Tacos, Brown beef)"
Tacos,"List(Tacos, Add taco seasoning and water, mix)"
Tacos,"List(Tacos, Bring to boil)"
Tacos,"List(Tacos, Lower heat to simmer 5-10 minutes until desired consistency)"
Tacos,"List(Tacos, Put meat in tacos)"


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType

data_schema = StructType(
  [
    StructField("Title",StringType(),True),
    StructField("Desc",ArrayType(StringType()),True)
  ]
)



# using StructType grammar we need a list as input data  thats why below conversion needed
import numpy as np
x=(data_df2.collect()) # collect retrieves all elements in a DataFrame as an Array
data_df4 = spark.createDataFrame(data=x,schema=data_schema) # we use Array x as data input -- we cant use another dataframe as input
data_df4.printSchema()
#display(data_df4)

print(x)
print(data_df4)

root
 |-- Title: string (nullable = true)
 |-- Desc: array (nullable = true)
 |    |-- element: string (containsNull = true)

[Row(title='Tacos', directions=['Brown beef', 'Add taco seasoning and water, mix', 'Bring to boil', 'Lower heat to simmer 5-10 minutes until desired consistency', 'Put meat in tacos'])]
DataFrame[Title: string, Desc: array<string>]


In [0]:

a1= data_df3.select(col("Title")).toPandas()['Title'].tolist() # instead of using collect to convert , using toPandas and then using tolist to convert from dataframe to list

print(type(a1))

# using collect func. bcz of retrieving all data , it can cause out of memory error in the case of big dataset

<class 'list'>


In [0]:
display(data_df3)

title,col
Tacos,Brown beef
Tacos,"Add taco seasoning and water, mix"
Tacos,Bring to boil
Tacos,Lower heat to simmer 5-10 minutes until desired consistency
Tacos,Put meat in tacos


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType

data_schema = StructType(
  [
    StructField("Title",StringType(),True),
    StructField("Desc",StringType(),True)
  ]
)


y=data_df3.select(col("Title"),col('col')).toPandas().values.tolist() # instead of collect we use toPandas and values to covert dataframe into a list
data_df4 = spStringTypeark.createDataFrame(data=y,schema=data_schema) # we use Array x as data input -- we cant use another dataframe as input
data_df4.printSchema()
#display(data_df4)

print(x)
display(data_df4)

root
 |-- Title: string (nullable = true)
 |-- Desc: string (nullable = true)

[Row(title='Tacos', directions=['Brown beef', 'Add taco seasoning and water, mix', 'Bring to boil', 'Lower heat to simmer 5-10 minutes until desired consistency', 'Put meat in tacos'])]


Title,Desc
Tacos,Brown beef
Tacos,"Add taco seasoning and water, mix"
Tacos,Bring to boil
Tacos,Lower heat to simmer 5-10 minutes until desired consistency
Tacos,Put meat in tacos


In [0]:
#using MapType  as input
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, MapType

schema = StructType([
    StructField('name', StringType(), True),
    StructField('properties', MapType(StringType(),StringType()),True)
])

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
dataDictionary = [
        ('James',{'hair':'black','eye':'brown'}),
        ('Michael',{'hair':'brown','eye':None}),
        ('Robert',{'hair':'red','eye':'black'}),
        ('Washington',{'hair':'grey','eye':'grey'}),
        ('Jefferson',{'hair':'brown','eye':''})
        ]
df = spark.createDataFrame(data=dataDictionary, schema = schema)
df.printSchema()
df.show(truncate=False)


display( df.select(col('name') ,col('properties').eye.alias("eye") , col('properties').hair.alias("hair"))  )

root
 |-- name: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+----------+-----------------------------+
|name      |properties                   |
+----------+-----------------------------+
|James     |{eye -> brown, hair -> black}|
|Michael   |{eye -> null, hair -> brown} |
|Robert    |{eye -> black, hair -> red}  |
|Washington|{eye -> grey, hair -> grey}  |
|Jefferson |{eye -> , hair -> brown}     |
+----------+-----------------------------+



name,eye,hair
James,brown,black
Michael,,brown
Robert,black,red
Washington,grey,grey
Jefferson,,brown


In [0]:
#for loop implementation
print(type(df))
df2=df.select(col('name') ,col('properties').eye.alias("eye") , col('properties').hair.alias("hair"))
print(type(df2))
df3=df2.toPandas()
print(type(df3))

for index, row in df3.iterrows(): # itterrow only works with pandas.dataframe
    print("\n")
    print(index)
    print(row['name'], row['hair'])

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pandas.core.frame.DataFrame'>


0
James black


1
Michael brown


2
Robert red


3
Washington grey


4
Jefferson brown


In [0]:
x=(df2.collect())  # collect function can work with <class 'pyspark.sql.dataframe.DataFrame'>

#y=(df3.collect())  # if type is <class 'pandas.core.frame.DataFrame'> we cant use collect function

display(df2.sample(0.20) ) # to get 20% sample records

name,eye,hair


In [0]:
# working on Parquet File 

#Apache Parquet file is a columnar storage format available to any project in the Hadoop ecosystem
#While querying columnar storage, it skips the nonrelevant data very quickly, making faster query execution. 
#As a result aggregation queries consume less time compared to row-oriented databases.



data =[("James ","","Smith","36636","M",3000),
              ("Michael ","Rose","","40288","M",4000),
              ("Robert ","","Williams","42114","M",4000),
              ("Maria ","Anne","Jones","39192","F",4000),
              ("Jen","Mary","Brown","","F",-1)]
columns=["firstname","middlename","lastname","dob","gender","salary"]
dfp=spark.createDataFrame(data,columns)

# write into parq. file
dfp.write.mode('overwrite').parquet("/tmp/output/people.parquet")

# read from parq.
parDF=spark.read.parquet("/tmp/output/people.parquet")


display(parDF)

#execute as sql

parDF.createOrReplaceTempView("ParquetTable")
parkSQL = spark.sql("select * from ParquetTable where salary >= 4000 ")

#CREATE TABLE USING PARQ. FILE
spark.sql("CREATE or REPLACE TEMPORARY VIEW PERSON USING parquet OPTIONS (path \"/tmp/output/people.parquet\")")
spark.sql("SELECT * FROM PERSON").show()


#CREATE PARTITIONED PARQ. file
dfp.write.partitionBy("gender","salary").mode("overwrite").parquet("/tmp/output/people2.parquet")


firstname,middlename,lastname,dob,gender,salary
Robert,,Williams,42114.0,M,4000
Maria,Anne,Jones,39192.0,F,4000
Michael,Rose,,40288.0,M,4000
James,,Smith,36636.0,M,3000
Jen,Mary,Brown,,F,-1


+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
| Michael |      Rose|        |40288|     M|  4000|
|   James |          |   Smith|36636|     M|  3000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+



In [0]:
#to_json() function is used to convert DataFrame columns MapType or Struct type to JSON string,,


#display(df)

print( df.printSchema()  )

from pyspark.sql.functions import to_json,col

df.withColumn("properties",to_json(col("properties"))).show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

None
+----------+------------------------------+
|name      |properties                    |
+----------+------------------------------+
|James     |{"eye":"brown","hair":"black"}|
|Michael   |{"eye":null,"hair":"brown"}   |
|Robert    |{"eye":"black","hair":"red"}  |
|Washington|{"eye":"grey","hair":"grey"}  |
|Jefferson |{"eye":"","hair":"brown"}     |
+----------+------------------------------+



In [0]:
#overlay() Function
#Replace column value with a string value from another column.

from pyspark.sql.functions import overlay
df = spark.createDataFrame([("ABCDE_XYZ", "FGH")], ("col1", "col2"))
df.select(overlay("col1", "col2",0).alias("overlayed")).show()
df.select(overlay("col1", "col2",1).alias("overlayed")).show()
df.select(overlay("col1", "col2",6).alias("overlayed")).show()


x=df.select(overlay("col1", "col2",6).alias("overlayed")).toPandas().values.tolist() 

print(type(x))

+----------+
| overlayed|
+----------+
|FGHCDE_XYZ|
+----------+

+---------+
|overlayed|
+---------+
|FGHDE_XYZ|
+---------+

+---------+
|overlayed|
+---------+
|ABCDEFGHZ|
+---------+

<class 'list'>


In [0]:
# read json example 2
from pyspark.sql.types import *

# Read multiline json file 1
data_df = spark.read.option("multiline","true").json("/FileStore/tables/JSON/Ex1.json")
display(data_df) 

print(data_df.printSchema() )


batters,id,name,ppu,topping,type
"List(List(List(1001, Regular), List(1002, Chocolate), List(1003, Blueberry), List(1004, Devil's Food)))",1,Cake,0.55,"List(List(5001, None), List(5002, Glazed), List(5005, Sugar), List(5007, Powdered Sugar), List(5006, Chocolate with Sprinkles), List(5003, Chocolate), List(5004, Maple))",donut
"List(List(List(1001, Regular)))",2,Raised,0.55,"List(List(5001, None), List(5002, Glazed), List(5005, Sugar), List(5003, Chocolate), List(5004, Maple))",donut
"List(List(List(1001, Regular), List(1002, Chocolate)))",3,Old Fashioned,0.55,"List(List(5001, None), List(5002, Glazed), List(5003, Chocolate), List(5004, Maple))",donut


root
 |-- batters: struct (nullable = true)
 |    |-- batter: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- ppu: double (nullable = true)
 |-- topping: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- type: string (nullable = true)

None


In [0]:
#using MapType  as input
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, MapType

schema = StructType([
  StructField('id',StringType()),
  StructField('batters', StructType([
    
        StructField(
        'batter', ArrayType(
            StructType([
                StructField('id', StringType(), True),
                StructField('type',StringType() , True) 
               
                
            ])
        )
    )
    
    
  ])  )
])

dfx=data_df[["id","batters"]].toPandas().values.tolist()


df = spark.createDataFrame(data=dfx, schema = schema)
display(   df.select(col("id"),col("batters").batter.id,col("batters").batter.type    )   )


id,batters.batter.id,batters.batter.type
1,"List(1001, 1002, 1003, 1004)","List(Regular, Chocolate, Blueberry, Devil's Food)"
2,List(1001),List(Regular)
3,"List(1001, 1002)","List(Regular, Chocolate)"
