In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType,ArrayType,MapType
from pyspark.sql import SparkSession

# Create SparkSession object
spark = SparkSession.builder.appName("create_df_with_diff_datatype").getOrCreate()

In [0]:
data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"],"OH","CA"),
 ("Michael,,Rose,",["Spark","Java","C++"],["Spark","Java"],"NY","NJ"),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"],"UT","NV")
]
#Create schema to pass while creating DataFrame
schema = StructType([
    StructField("name", StringType(), True),
    StructField("languagesAtSchool",ArrayType(StringType()),True), 
    StructField("languagesAtWork",ArrayType(StringType()),True), 
    StructField("currentState", StringType(), True), 
    StructField("previousState", StringType(), True) 
    ])
#pass the data and schema
df = spark.createDataFrame(data, schema)
df.printSchema()
df.show()

root
 |-- name: string (nullable = true)
 |-- languagesAtSchool: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- languagesAtWork: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- currentState: string (nullable = true)
 |-- previousState: string (nullable = true)

+----------------+------------------+---------------+------------+-------------+
|            name| languagesAtSchool|languagesAtWork|currentState|previousState|
+----------------+------------------+---------------+------------+-------------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|          OH|           CA|
|  Michael,,Rose,|[Spark, Java, C++]|  [Spark, Java]|          NY|           NJ|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|          UT|           NV|
+----------------+------------------+---------------+------------+-------------+



In [0]:
#Pyspark Write DataFrame to Parquet file format
df.write.parquet("/tmp/output/sample.parquet")
#Pyspark Read Parquet file into DataFrame
parDF=spark.read.parquet("/tmp/output/sample.parquet")
parDF.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-876351177773483>[0m in [0;36m<cell line: 2>[0;34m()[0m
[1;32m      1[0m [0;31m#Pyspark Write DataFrame to Parquet file format[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mdf[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0mparquet[0m[0;34m([0m[0;34m"/tmp/output/sample.parquet"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      3[0m [0;31m#Pyspark Read Parquet file into DataFrame[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m [0mparDF[0m[0;34m=[0m[0mspark[0m[0;34m.[0m[0mread[0m[0;34m.[0m[0mparquet[0m[0;34m([0m[0;34m"/tmp/output/sample.parquet"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      5[0m [0mparDF[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/spark/python/pyspark/ins

In [0]:
#Import explode
from pyspark.sql.functions import explode
df.select(df.name).show()



In [0]:
#explode() function to create a new row for each element
df1 = df.select(df.name,explode(df.languagesAtSchool).alias("knownLanguagesatSchool"))
df1.show()



In [0]:
#Check how to create array of multiple value of one columns
#import split()
from pyspark.sql.functions import split



In [0]:
#split() sql function returns an array type after splitting the string column by delimiter
#Here we split the name column by comma delimiter
df3 = df.select(split(df.name,",").alias("nameArray"))
df3.show()



In [0]:
#create array of two columns value
#import array
from pyspark.sql.functions import array
#keep two column currentState and PreviousState in common column(State)
df.select(array(df.currentState,df.previousState).alias("States")).show()



In [0]:
#check if that array contains something or not
#import array_contains
from pyspark.sql.functions import array_contains
df.select(df.name,array_contains(df.languagesAtSchool,"Java")
    .alias("array_contains")).show()



In [0]:
#data with array that contains string & dictionary inside
dataDictionary = [
        ('James',{'hair':'black','skintone':'black'}),
        ('Michael',{'hair':'brown'}),
        ('Robert',{'hair':'red'}),
        ('Washington',{'hair':'grey'}),
        ('Jefferson',{'hair':'brown'})
        ]
schema = StructType([
    StructField("name", StringType(), True),
    StructField('properties', MapType(StringType(),StringType()), True)
    ])
#Create dataFrame
df4 = spark.createDataFrame(dataDictionary, schema = schema)
df4.printSchema()
df4.show()



In [0]:
#use explode to separate key & value in diff column
df5 = df4.select(df4.name,explode(df4.properties))
df5.printSchema()
df5.show()



In [0]:
#Import map_keys and map_values
from pyspark.sql.functions import map_keys, map_values

#Used map_keys function to get dataframe with keys of column properties
df4_keys = df4.select(df4.name,map_keys('properties').alias('keys of properties'))
df4_keys.show()



In [0]:
#Used map_keys function to get dataframe with values of column properties
df4_values = df4.select(map_values('properties').alias('keys'))
df4_values.show()



In [0]:
df4.select(df4.name,explode_outer(df4.properties)).show()




In [0]:
# Create a Row Object
#Import Row
from pyspark.sql import Row 
#We can retrieve the data from Row using index
row = Row("Junee", "Shrestha", "Newar")
print(row[0])
print(row[1])
print(row[2])
print(f"{row[0]} {row[1]} is a {row[2]}")




In [0]:
#We can also write with named arguments so that we can access with field name row.name
row=Row(name="Anam", age=11)
print(row.name) 



In [0]:
#Create Custom Class from Row

Person = Row("name", "age")
p1=Person("Junee", 40)
p2=Person("Sunee", 35)
print(p1.name +","+p2.name)



In [0]:
#Creating Rdd from given dataset
data = [("James,,Smith", ["Java","Scala","C++"], "CA"), 
("Michael,Rose,", ["Spark","Java","C++"], "NJ"),
("Robert,,Williams", ["CSharp","VB"] ,"NV")]
rdd = spark.sparkContext.parallelize(data)
rdd.collect()




In [0]:
#Convert rdd to dataframe using row
from pyspark.sql import Row 
df = rdd.map(lambda x: Row(name=x[0], langauge=x[1], position=x[2])).toDF()
df.show()



In [0]:
#Given dataset to create dataframe
from pyspark.sql.types import StructType, StructField, StringType
all_data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]
#Create schema to pass while creating DataFrame
schema = StructType([
    StructField("firstname", StringType(), True),
    StructField("lastname", StringType(), True),
    StructField("country", StringType(), True), 
    StructField("state", StringType(), True) 
    ])
all_data_df = spark.createDataFrame(all_data, schema)
all_data_df.show()



In [0]:
all_data_df.select(['firstname', 'country']).show()



In [0]:
#read csv file
titanic_df = spark.read.csv("/FileStore/tables/sample5.csv", header=True, inferSchema=True)
titanic_df.show()



In [0]:
#col() function to select columns 2 to 6 
#select() method to select only columns 2 to 6
#limit() method to limit the output to the first 6 rows
from pyspark.sql.functions import col
titanic_df1 = titanic_df.select(col("Survived"), col("Pclass"), col("Name"), col("Sex"), col("Age")).limit(6)
titanic_df1.show()
titanic_df1.select(titanic_df1.columns[1:7]).show(6)




In [0]:
#Create a file which store data in partition of year
data2 = [(2012,8,"Batman",9.8),
           (2012,8,"Hero",8.7),
           (2012,7,"Robot",5.5),
           (2011,7,"git",2.0)
  ]
columns = ["year","month","title","rating"]
df_yearwise = spark.createDataFrame(data2, columns) 
df_yearwise.show()



In [0]:
df_yearwise.write.partitionBy("year").mode("overwrite").parquet("/path/to/parquet7_file")
data_yearwise_partition2 = spark.read.parquet("/path/to/parquet7_file")
data_yearwise_partition2.show()
data_yearwise_partition2012 = spark.read.parquet("/path/to/parquet7_file").where(col('year')==2012)
data_yearwise_partition2012.show()



In [0]:
df_yearwise.write.partitionBy("year") \
        .format("avro").save("/FileStore/tables/yearwise_partitions.avro")




In [0]:
df_yearwise2 = spark.read.format("avro").load("/FileStore/tables/yearwise_partitions.avro")
df_yearwise2.show()



In [0]:
#partition interms of month and year
df_yearwise.write.option("header",True) \
        .partitionBy("year","month") \
        .parquet("/FileStore/tables/year_month_partitions")



In [0]:
df_year_monthwise = spark.read.parquet("/FileStore/tables/year_month_partitions")
df_year_monthwise.show()



In [0]:
data = {'name': ['Alice', 'Bob', 'Charlie'],
        'age': [25, 30, 35]}

df6 = pd.DataFrame(data)
df6.show()

