# PySpark Tutorial

In [1]:
# First Load all the required library and also Start Spark Session
# Load all the required library
from pyspark.sql import SparkSession
from pyspark.sql.functions import array, col, array_contains

In [2]:
#Запуск Spark Session
spark=SparkSession.builder.config("spark.driver.host", "localhost").appName('appname').getOrCreate()
sqlContext = SparkSession(spark)
#Dont Show warning only error
spark.sparkContext.setLogLevel("ERROR")

3. Read CSV file in to Dataframe using PySpark

In [4]:
#Load CSV file into DataFrame
csvdf = spark.read.format("csv").option("header","true").option("inferSchema","true").load("lenta-ru-partial.csv")

Write DataFrame into CSV file using PySpark

In [None]:
csvdf.write.csv("zipcodes")#options:
#modestr, optional
# specifies the behavior of the save operation when data already exists.
# append: Append contents of this DataFrame to existing data.
# overwrite: Overwrite existing data.
# ignore: Silently ignore this operation if data already exists.
# error or errorifexists (default case): Throw an exception if data already
# exists

Read json file into DataFrame using Pyspark

In [5]:
# Read JSON file into dataframe
df = spark.read.json("zipcodes.json")
df.printSchema()
df.show()

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/Users/17867234/PySpark/zipcodes.json.

In [None]:
# Read JSON file into dataframe
df = spark.read.format('org.apache.spark.sql.json') \
        .load("resources/zipcodes.json")

In [None]:
# Read multiline json file
multiline_df = spark.read.option("multiline","true") \
      .json("resources/multiline-zipcode.json")
multiline_df.show() 

In [None]:
# Read all JSON files from a folder
df3 = spark.read.json("resources/*.json")
df3.show()

In [None]:
spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode USING json OPTIONS" + 
      " (path 'resources/zipcodes.json')")
spark.sql("select * from zipcode").show()

Write DataFrame into json file using PySpark

In [4]:
csvdf.write.format('json').save('flights1.json')#mode=

Read&write Parquet file into Dataframe using PySpark

In [5]:
# Read and Write Parquet file using parquet()
df.write.parquet("people.parquet") 
parDF1=spark.read.parquet("people.parquet")
# Using append and overwrite to save parquet file
df.write.mode('append').parquet("/tmp/output/people.parquet")
df.write.mode('overwrite').parquet("/tmp/output/people.parquet")

NameError: name 'df' is not defined

withColumn() in PySpark

In [6]:
df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.withColumn('age2', df.age + 2).show()

+---+-----+----+
|age| name|age2|
+---+-----+----+
|  2|Alice|   4|
|  5|  Bob|   7|
+---+-----+----+



withColumnRenamed() usage in PySpark

In [7]:
df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.withColumnRenamed('age', 'age2').show()

+----+-----+
|age2| name|
+----+-----+
|   2|Alice|
|   5|  Bob|
+----+-----+



StructType() & StructField() in PySpark 

In [8]:
# Imports
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
# df.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [9]:
from pyspark.sql.types import StringType, ArrayType
arrayCol = ArrayType(StringType(),False)
data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"],"OH","CA"),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"],"NY","NJ"),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"],"UT","NV")
]

from pyspark.sql.types import StringType, ArrayType,StructType,StructField
schema = StructType([ 
    StructField("name",StringType(),True), 
    StructField("languagesAtSchool",ArrayType(StringType()),True), 
    StructField("languagesAtWork",ArrayType(StringType()),True), 
    StructField("currentState", StringType(), True), 
    StructField("previousState", StringType(), True)
  ])

df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()


root
 |-- name: string (nullable = true)
 |-- languagesAtSchool: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- languagesAtWork: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- currentState: string (nullable = true)
 |-- previousState: string (nullable = true)



In [10]:
df.show()

+----------------+------------------+---------------+------------+-------------+
|            name| languagesAtSchool|languagesAtWork|currentState|previousState|
+----------------+------------------+---------------+------------+-------------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|          OH|           CA|
|   Michael,Rose,|[Spark, Java, C++]|  [Spark, Java]|          NY|           NJ|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|          UT|           NV|
+----------------+------------------+---------------+------------+-------------+



In [11]:
data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"],"OH","CA"),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"],"NY","NJ"),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"],"UT","NV")
]

from pyspark.sql.types import StringType, ArrayType,StructType,StructField
schema = StructType([ 
    StructField("name",StringType(),True), 
    StructField("languagesAtSchool",ArrayType(StringType()),True), 
    StructField("languagesAtWork",ArrayType(StringType()),True), 
    StructField("currentState", StringType(), True), 
    StructField("previousState", StringType(), True)
  ])

df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()

root
 |-- name: string (nullable = true)
 |-- languagesAtSchool: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- languagesAtWork: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- currentState: string (nullable = true)
 |-- previousState: string (nullable = true)

+----------------+------------------+---------------+------------+-------------+
|            name| languagesAtSchool|languagesAtWork|currentState|previousState|
+----------------+------------------+---------------+------------+-------------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|          OH|           CA|
|   Michael,Rose,|[Spark, Java, C++]|  [Spark, Java]|          NY|           NJ|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|          UT|           NV|
+----------------+------------------+---------------+------------+-------------+



In [12]:
#explode() function - se explode() function to create a new row for each element in the given array column. 
from pyspark.sql.functions import explode
from pyspark.sql import Row
df = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
df.select(explode(df.intlist).alias("anInt")).collect()


[Row(anInt=1), Row(anInt=2), Row(anInt=3)]

In [7]:
#split function - PySpark split() Column into Multiple Columns
from pyspark.sql.functions import split

In [14]:
spark=SparkSession.builder.config("spark.driver.host", "localhost").appName('appname').getOrCreate()

In [5]:
#spark = SparkSession.builder.appName("sparkbyexamples").getOrCreate()
data=data = [('James','','Smith','1991-04-01'),
  ('Michael','Rose','','2000-05-19'),
  ('Robert','','Williams','1978-09-05'),
  ('Maria','Anne','Jones','1967-12-01'),
  ('Jen','Mary','Brown','1980-02-17')
]

columns=["firstname","middlename","lastname","dob"]
df=spark.createDataFrame(data,columns)
df.printSchema()
df.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)

+---------+----------+--------+----------+
|firstname|middlename|lastname|dob       |
+---------+----------+--------+----------+
|James    |          |Smith   |1991-04-01|
|Michael  |Rose      |        |2000-05-19|
|Robert   |          |Williams|1978-09-05|
|Maria    |Anne      |Jones   |1967-12-01|
|Jen      |Mary      |Brown   |1980-02-17|
+---------+----------+--------+----------+



In [8]:
# split() with withColumn
df1 = df.withColumn('year', split(df['dob'], '-').getItem(0)) \
       .withColumn('month', split(df['dob'], '-').getItem(1)) \
       .withColumn('day', split(df['dob'], '-').getItem(2))
df1.show(truncate=False)

+---------+----------+--------+----------+----+-----+---+
|firstname|middlename|lastname|dob       |year|month|day|
+---------+----------+--------+----------+----+-----+---+
|James    |          |Smith   |1991-04-01|1991|04   |01 |
|Michael  |Rose      |        |2000-05-19|2000|05   |19 |
|Robert   |          |Williams|1978-09-05|1978|09   |05 |
|Maria    |Anne      |Jones   |1967-12-01|1967|12   |01 |
|Jen      |Mary      |Brown   |1980-02-17|1980|02   |17 |
+---------+----------+--------+----------+----+-----+---+



In [5]:
from pyspark.sql.functions import array, col, array_contains

In [11]:
df2 = df1.withColumn('year1', array(col('dob'), col('year')))

In [12]:
df2.show()

+---------+----------+--------+----------+----+-----+---+------------------+
|firstname|middlename|lastname|       dob|year|month|day|             year1|
+---------+----------+--------+----------+----+-----+---+------------------+
|    James|          |   Smith|1991-04-01|1991|   04| 01|[1991-04-01, 1991]|
|  Michael|      Rose|        |2000-05-19|2000|   05| 19|[2000-05-19, 2000]|
|   Robert|          |Williams|1978-09-05|1978|   09| 05|[1978-09-05, 1978]|
|    Maria|      Anne|   Jones|1967-12-01|1967|   12| 01|[1967-12-01, 1967]|
|      Jen|      Mary|   Brown|1980-02-17|1980|   02| 17|[1980-02-17, 1980]|
+---------+----------+--------+----------+----+-----+---+------------------+



In [3]:
import pyspark.sql.functions as f

df_updated = df.select(df.data, f.array_contains(df.data, "a").alias("contains_check"))

df_updated.show()

NameError: name 'df' is not defined

In [6]:
df = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
df.select(array_contains(df.data, "a")).collect()

[Row(array_contains(data, a)=True), Row(array_contains(data, a)=False)]

In [13]:
df.filter(array_contains(df.data, "a")).show()

+---------+
|     data|
+---------+
|[a, b, c]|
+---------+



In [14]:
from pyspark.sql.types import StructField, StructType, StringType, MapType
schema = StructType([
    StructField('name', StringType(), True),
    StructField('properties', MapType(StringType(),StringType()),True)
])

In [15]:
schema

StructType([StructField('name', StringType(), True), StructField('properties', MapType(StringType(), StringType(), True), True)])