In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType

In [0]:
spark = SparkSession.builder.appName("ConvertToDataFrame").getOrCreate()

In [0]:
data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"],"OH","CA"),
 ("Michael,,Rose,",["Spark","Java","C++"],["Spark","Java"],"NY","NJ"),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"],"UT","NV")
]

In [0]:
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Languageatschool", ArrayType(StringType()),True ),
    StructField("languageatwork", ArrayType(StringType()),True),
    StructField("Currentstate", StringType(), True),
    StructField("Previousstate", StringType(), True)
])
df_latest = spark.createDataFrame(data,schema)
df_latest.printSchema()
df_latest.show()

root
 |-- Name: string (nullable = true)
 |-- Languageatschool: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- languageatwork: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Currentstate: string (nullable = true)
 |-- Previousstate: string (nullable = true)

+----------------+------------------+---------------+------------+-------------+
|            Name|  Languageatschool| languageatwork|Currentstate|Previousstate|
+----------------+------------------+---------------+------------+-------------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|          OH|           CA|
|  Michael,,Rose,|[Spark, Java, C++]|  [Spark, Java]|          NY|           NJ|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|          UT|           NV|
+----------------+------------------+---------------+------------+-------------+



In [0]:
#saving dataframe as csv
from pyspark.sql.functions import col, concat_ws
df2 = df_latest.withColumn("Languageatschool", concat_ws(",", col("languageatschool")))
df3 = df2.withColumn("languageatwork", concat_ws(",", col("languageatwork")))
df3.printSchema()
df3.show()

root
 |-- Name: string (nullable = true)
 |-- Languageatschool: string (nullable = false)
 |-- languageatwork: string (nullable = false)
 |-- Currentstate: string (nullable = true)
 |-- Previousstate: string (nullable = true)

+----------------+----------------+--------------+------------+-------------+
|            Name|Languageatschool|languageatwork|Currentstate|Previousstate|
+----------------+----------------+--------------+------------+-------------+
|    James,,Smith|  Java,Scala,C++|    Spark,Java|          OH|           CA|
|  Michael,,Rose,|  Spark,Java,C++|    Spark,Java|          NY|           NJ|
|Robert,,Williams|       CSharp,VB|  Spark,Python|          UT|           NV|
+----------------+----------------+--------------+------------+-------------+



In [0]:
df3.write.format("csv").option("header", "true").option("inferschema", "false").save("FileStore/tables/newdata.csv")


In [0]:
#1. We need to check how to create array of multiple value of one columns
from pyspark.sql.functions import split
df3.select(split(df3.Name,",").alias("nameAsArray")).show()

+--------------------+
|         nameAsArray|
+--------------------+
|    [James, , Smith]|
| [Michael, , Rose, ]|
|[Robert, , Williams]|
+--------------------+



In [0]:
#2.How to create array of two columns value
from pyspark.sql.functions import array
df3.select(df3.Name,array(df3.Currentstate,df3.Previousstate).alias("States")).show()

+----------------+--------+
|            Name|  States|
+----------------+--------+
|    James,,Smith|[OH, CA]|
|  Michael,,Rose,|[NY, NJ]|
|Robert,,Williams|[UT, NV]|
+----------------+--------+



In [0]:
#task3 check whether the column contains  "Java" or not and return the value
from pyspark.sql.functions import array_contains
df.select(df.name,array_contains(df["languageatwork"],"Java")
    .alias("value_contain")).show()

+----------------+-------------+
|            name|value_contain|
+----------------+-------------+
|    James,,Smith|         true|
|  Michael,,Rose,|         true|
|Robert,,Williams|        false|
+----------------+-------------+



In [0]:
data3=[("Nitesh",["Pyspark","databricks","Snowflake","DBT"])]
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Skills", ArrayType(StringType()), True)])
df4=spark.createDataFrame(data3,schema=schema)
df4.show()

+------+--------------------+
|  Name|              Skills|
+------+--------------------+
|Nitesh|[Pyspark, databri...|
+------+--------------------+



In [0]:
from pyspark.sql.functions import explode
df5 = df4.select(df4.Name,explode(df4.Skills))
df5.printSchema()
df5.show()

root
 |-- Name: string (nullable = true)
 |-- col: string (nullable = true)

+------+----------+
|  Name|       col|
+------+----------+
|Nitesh|   Pyspark|
|Nitesh|databricks|
|Nitesh| Snowflake|
|Nitesh|       DBT|
+------+----------+



In [0]:
#dictionary
dataDictionary = [
        ('James',{'hair':'black'}),
        ('Michael',{'hair':'brown'}),
        ('Robert',{'hair':'red'}),
        ('Washington',{'hair':'grey'}),
        ('Jefferson',{'hair':'brown'})
        ]
from pyspark.sql.types import StringType, MapType
mapCol = MapType(StringType(),StringType(),False)

In [0]:
from pyspark.sql.types import StructField, StructType, StringType, MapType
schema = StructType([
    StructField('name', StringType(), True),
    StructField('Hair', MapType(StringType(),StringType()),True)
])

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
dataDictionary = [
        ('James',{'hair':'black'}),
        ('Michael',{'hair':'brown'}),
        ('Robert',{'hair':'red'}),
        ('Washington',{'hair':'grey'}),
        ('Jefferson',{'hair':'brown'})
        ]
df = spark.createDataFrame(data=dataDictionary, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- Hair: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+----------+---------------+
|name      |Hair           |
+----------+---------------+
|James     |{hair -> black}|
|Michael   |{hair -> brown}|
|Robert    |{hair -> red}  |
|Washington|{hair -> grey} |
|Jefferson |{hair -> brown}|
+----------+---------------+



In [0]:
from pyspark.sql.functions import explode
df.select(df.name,explode(df.Hair)).show()

+----------+----+-----+
|      name| key|value|
+----------+----+-----+
|     James|hair|black|
|   Michael|hair|brown|
|    Robert|hair|  red|
|Washington|hair| grey|
| Jefferson|hair|brown|
+----------+----+-----+



In [0]:
 from pyspark.sql import Row
 row = Row(name='roshan',age=24,salary=10000)
 print(row.name)

roshan


In [0]:
from pyspark.sql import Row
row=Row("name","height")
person = row('anish','5.6')
print(person.name)
print(person.height)

anish
5.6


In [0]:
#data
(“James,,Smith", ["Java","Scala","C++"], “CA"), 
(“Michael,Rose,", ["Spark","Java","C++"], “NJ"),
(“Robert,,Williams”,["CSharp","VB"] ,”NV")

In [0]:
#creating RDD using row
from pyspark.sql import SparkSession, Row
spark = SparkSession.builder.appName('Row').getOrCreate()

data = [Row(name="James,,Smith",Languageatschool=["Java","Scala","C++"],state="CA"), 
    Row(name="Michael,Rose,",Languageatschool=["Spark","Java","C++"],state="NJ"),
    Row(name="Robert,,Williams",Languageatschool=["CSharp","VB"],state="NV")]
rdd=spark.sparkContext.parallelize(data)
print(rdd.collect())

[Row(name='James,,Smith', Languageatschool=['Java', 'Scala', 'C++'], state='CA'), Row(name='Michael,Rose,', Languageatschool=['Spark', 'Java', 'C++'], state='NJ'), Row(name='Robert,,Williams', Languageatschool=['CSharp', 'VB'], state='NV')]


In [0]:
#creating dataframe using row
df=spark.createDataFrame(data)
df.printSchema()
df.show()

root
 |-- name: string (nullable = true)
 |-- Languageatschool: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)

+----------------+------------------+-----+
|            name|  Languageatschool|state|
+----------------+------------------+-----+
|    James,,Smith|[Java, Scala, C++]|   CA|
|   Michael,Rose,|[Spark, Java, C++]|   NJ|
|Robert,,Williams|      [CSharp, VB]|   NV|
+----------------+------------------+-----+



In [0]:
#changing column name using df functions
columns = ["name","languagesAtSchool","currentState"]
df=spark.createDataFrame(data).toDF(*columns)
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- languagesAtSchool: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- currentState: string (nullable = true)



In [0]:
df.show()

+----------------+------------------+------------+
|            name| languagesAtSchool|currentState|
+----------------+------------------+------------+
|    James,,Smith|[Java, Scala, C++]|          CA|
|   Michael,Rose,|[Spark, Java, C++]|          NJ|
|Robert,,Williams|      [CSharp, VB]|          NV|
+----------------+------------------+------------+



In [0]:
data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]
columns = ["firstname","lastname","country","state"]
df=spark.createDataFrame(data, schema = columns)
df.printSchema()
df.show()


root
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|    James|   Smith|    USA|   CA|
|  Michael|    Rose|    USA|   NY|
|   Robert|Williams|    USA|   CA|
|    Maria|   Jones|    USA|   FL|
+---------+--------+-------+-----+



In [0]:
df.select(df.firstname).show()

+---------+
|firstname|
+---------+
|    James|
|  Michael|
|   Robert|
|    Maria|
+---------+



In [0]:
df.select(df['country']).show()

+-------+
|country|
+-------+
|    USA|
|    USA|
|    USA|
|    USA|
+-------+



In [0]:
df = spark.read.csv('/FileStore/tables/MOCK_DATA-1.csv')

In [0]:
df.show()

+---+----------+----------+--------------------+------+--------------------+------------+----------+----------+---------------+
|_c0|       _c1|       _c2|                 _c3|   _c4|                 _c5|         _c6|       _c7|       _c8|            _c9|
+---+----------+----------+--------------------+------+--------------------+------------+----------+----------+---------------+
| id|first_name| last_name|               email|gender|                city|       phone|      date|  password|confirmpassword|
|  1|  Gauthier|  Taudevin|gtaudevin0@google.cn|  Male|Municipio de Copa...|456-347-6401| 7/18/2022|K4tDIv1QfJ|       Z9KrC3tr|
|  2|     Judah|  Sircombe|jsircombe1@chicag...|  Male|           Tapakrejo|575-376-0675|  1/7/2023|    8kdhrq|     ZftH4ElCFf|
|  3|    Perice|Camilletti|pcamilletti2@bbc....|  Male|            Den Haag|811-772-2893| 3/21/2023| P1IdX5Th5|         zrFfbs|
|  4|  Elizabet|   Blewmen|eblewmen3@blinkli...|Female|            Laveiras|687-217-6248| 6/26/2022| VYS

In [0]:
df.select(df.columns[1:7]).show(6)

+----------+----------+--------------------+------+--------------------+------------+
|       _c1|       _c2|                 _c3|   _c4|                 _c5|         _c6|
+----------+----------+--------------------+------+--------------------+------------+
|first_name| last_name|               email|gender|                city|       phone|
|  Gauthier|  Taudevin|gtaudevin0@google.cn|  Male|Municipio de Copa...|456-347-6401|
|     Judah|  Sircombe|jsircombe1@chicag...|  Male|           Tapakrejo|575-376-0675|
|    Perice|Camilletti|pcamilletti2@bbc....|  Male|            Den Haag|811-772-2893|
|  Elizabet|   Blewmen|eblewmen3@blinkli...|Female|            Laveiras|687-217-6248|
|  Etheline|    Garret|   egarret4@gmpg.org|Female|            Jandayan|966-407-1091|
+----------+----------+--------------------+------+--------------------+------------+
only showing top 6 rows



In [0]:
data2 = [(2012,8,"Batman",9.8),
           (2012,8,"Hero",8.7),
           (2012,7,"Robot",5.5),
           (2011,7,"git",2.0)
  ]
columns = ["year","month","title","rating"]
df=spark.createDataFrame(data2,schema = columns)
df.show()

+----+-----+------+------+
|year|month| title|rating|
+----+-----+------+------+
|2012|    8|Batman|   9.8|
|2012|    8|  Hero|   8.7|
|2012|    7| Robot|   5.5|
|2011|    7|   git|   2.0|
+----+-----+------+------+



In [0]:
df.write.option("header",True) \
        .partitionBy("year") \
        .parquet("/FileStore/tables/yearByPartitions.parquet")

In [0]:
df.write.partitionBy("year") \
        .format("avro").save("/FileStore/tables/person_partition1.avro")

In [0]:
df.write.option("header",True) \
        .partitionBy("year","month") \
        .csv("/FileStore/tables/person_partition1")