# DataFrame & SQL
## <font color=orange>Create</font>
* spark.read.option('header', true).option('').csv(path)
* spark.createDataFrame(data, schema)
### DataType
* ArrayType  
    - ex. StructField('name', ArrayType(StringType()), True)
* MapType: similar python dict  
    - ex. StructField('name', MapType(StringType(), StringType()), True) 

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Demo').getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('WARN')
# empty RDD
emptyRDD = sc.emptyRDD()
emptyRDD = sc.parallelize([])
print("emptyRDD: ",emptyRDD)

# Create Schema
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, ArrayType, MapType


schema = StructType([
    StructField('firstname', StringType(), True),
    StructField('middlename', StringType(), True),
    StructField('lastname', StringType(), True)
])

# Convert Dataframe from emptyRDD
df = spark.createDataFrame(data=emptyRDD, schema=schema)
print('Convert DataFrame from emptyRDD')
df.printSchema()

# Create DataFrame from schema
df2 = spark.createDataFrame([], schema)
df2.printSchema()

# Create empty DF with no schema
df3 = spark.createDataFrame([],StructType([]))
df3.printSchema()

# convert DF to pandas
df_pandas = df.toPandas()
type(df_pandas), type(df)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/28 21:40:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/09/28 21:40:45 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
emptyRDD:  ParallelCollectionRDD[1] at readRDDFromFile at PythonRDD.scala:274
Convert DataFrame from emptyRDD
root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)

root



                                                                                

(pandas.core.frame.DataFrame, pyspark.sql.dataframe.DataFrame)

In [3]:
from pyspark.sql.functions import col, struct, when, lit
structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df2 = spark.createDataFrame(data=structureData,schema=structureSchema)
df2.printSchema()
df2.show(truncate=False)

updatedDF = df2.withColumn("OtherInfo", 
    struct(col("id").alias("identifier"),
    col("gender").alias("gender"),
    col("salary").alias("salary"),
    when(col("salary").cast(IntegerType()) < 2000,"Low")
      .when(col("salary").cast(IntegerType()) < 4000,"Medium")
      .otherwise("High").alias("Salary_Grade")
  )).drop("id","gender","salary")
updatedDF.printSchema()
updatedDF.show(truncate=False)

import json
schemaFromJson = StructType.fromJson(json.loads(structureSchema.json()))
df3 = spark.createDataFrame(
    sc.parallelize(structureData), schemaFromJson)
df3.show()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+--------------------+-----+------+------+
|name                |id   |gender|salary|
+--------------------+-----+------+------+
|{James, , Smith}    |36636|M     |3100  |
|{Michael, Rose, }   |40288|M     |4300  |
|{Robert, , Williams}|42114|M     |1400  |
|{Maria, Anne, Jones}|39192|F     |5500  |
|{Jen, Mary, Brown}  |     |F     |-1    |
+--------------------+-----+------+------+

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- OtherInfo: struct (nullable = false)
 |    |-- identifier: string (nullable = true)
 |    |-- gender: string (nullable = true)
 |    |-- 

In [83]:
df3.schema.simpleString()

'struct<name:struct<firstname:string,middlename:string,lastname:string>,id:string,gender:string,salary:int>'

### Create/drop `TempView`
1. Create
  * Global: 
    ```python
    df.createOrReplaceGlobalTempView('TableName')
    spark.sql("select * from global_temp.TableName")
    ```
  * Local:  
    ```python
    df.createOrReplaceTempView('TableName')
    spark.sql("select * from TableName)
    ```
2. Drop
  * spark.catalog.dropGlobalTempView('TableName')
  * spark.catalog.dropTempView('TableName')

In [10]:
df2.createOrReplaceTempView('tmp_df2')
print(spark.sql('select * from tmp_df2 ').collect())
print(spark.catalog.dropTempView('tmp_df2'))

[Row(name=Row(firstname='James', middlename='', lastname='Smith'), id='36636', gender='M', salary=3100), Row(name=Row(firstname='Michael', middlename='Rose', lastname=''), id='40288', gender='M', salary=4300), Row(name=Row(firstname='Robert', middlename='', lastname='Williams'), id='42114', gender='M', salary=1400), Row(name=Row(firstname='Maria', middlename='Anne', lastname='Jones'), id='39192', gender='F', salary=5500), Row(name=Row(firstname='Jen', middlename='Mary', lastname='Brown'), id='', gender='F', salary=-1)]
None


## <font color=orange>Update</font>

### 新增欄位 
* 新增或替換現有的欄位 withColumn

In [137]:
from pyspark.sql import Row
from pyspark.sql import functions as F
#1 新增欄位輸入值 要用lit
df2.withColumn('new_col', lit(0)).show()

#2 依現有欄位條件輸入新欄位
df2.withColumn('new_co', when(df2.salary<0,'error'). \
                         when(df2.salary==1400, '1400'). \
                         when(df2.salary.between(0,2000),'0-2000'). \
                         otherwise('over2000')).show()
#3 新增一個list 轉為spark dataframe 再join
# 目前尚未找到類似pd.concat功能

id = F.monotonically_increasing_id()
row = Row('val', 'num')
lt = lt = [row(f'x{i}',i) for i in range(5)]
sc_lt = spark.sparkContext.parallelize(lt).toDF()
sc_lt = sc_lt.withColumn('matchID', id)
df2_new = df2.withColumn('matchID',id)
df3 = df2_new.join(sc_lt, df2_new.matchID == sc_lt.matchID, how='inner').drop('matchID')
df3 = df3.withColumn('newid', when(df3.id=='',None).otherwise(df3.id))

+--------------------+-----+------+------+-------+
|                name|   id|gender|salary|new_col|
+--------------------+-----+------+------+-------+
|    {James, , Smith}|36636|     M|  3100|      0|
|   {Michael, Rose, }|40288|     M|  4300|      0|
|{Robert, , Williams}|42114|     M|  1400|      0|
|{Maria, Anne, Jones}|39192|     F|  5500|      0|
|  {Jen, Mary, Brown}|     |     F|    -1|      0|
+--------------------+-----+------+------+-------+

+--------------------+-----+------+------+--------+
|                name|   id|gender|salary|  new_co|
+--------------------+-----+------+------+--------+
|    {James, , Smith}|36636|     M|  3100|over2000|
|   {Michael, Rose, }|40288|     M|  4300|over2000|
|{Robert, , Williams}|42114|     M|  1400|    1400|
|{Maria, Anne, Jones}|39192|     F|  5500|over2000|
|  {Jen, Mary, Brown}|     |     F|    -1|   error|
+--------------------+-----+------+------+--------+



### 條件篩選

In [145]:
# filter
df3.filter(df3.salary<0).show()
# where 
df3.where(df3.salary>0).show()
# between
df3.filter(df3.salary.between(3100,4400)).show()
# isin
df3[df3.salary.isin([3100,-1])].show()
# isnull(None) / isnan(非數字)
df3.filter(F.isnull('newid')).show()


+------------------+---+------+------+---+---+-----+
|              name| id|gender|salary|val|num|newid|
+------------------+---+------+------+---+---+-----+
|{Jen, Mary, Brown}|   |     F|    -1| x4|  4| null|
+------------------+---+------+------+---+---+-----+

+--------------------+-----+------+------+---+---+-----+
|                name|   id|gender|salary|val|num|newid|
+--------------------+-----+------+------+---+---+-----+
|    {James, , Smith}|36636|     M|  3100| x0|  0|36636|
|   {Michael, Rose, }|40288|     M|  4300| x1|  1|40288|
|{Robert, , Williams}|42114|     M|  1400| x2|  2|42114|
|{Maria, Anne, Jones}|39192|     F|  5500| x3|  3|39192|
+--------------------+-----+------+------+---+---+-----+

+-----------------+-----+------+------+---+---+-----+
|             name|   id|gender|salary|val|num|newid|
+-----------------+-----+------+------+---+---+-----+
| {James, , Smith}|36636|     M|  3100| x0|  0|36636|
|{Michael, Rose, }|40288|     M|  4300| x1|  1|40288|
+------

### <font color=orange>Rename column</font>

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import *

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

dataDF = [(('James','','Smith'),'1991-04-01','M',3000),
  (('Michael','Rose',''),'2000-05-19','M',4000),
  (('Robert','','Williams'),'1978-09-05','M',4000),
  (('Maria','Anne','Jones'),'1967-12-01','F',4000),
  (('Jen','Mary','Brown'),'1980-02-17','F',-1)
]

schema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('dob', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df = spark.createDataFrame(data = dataDF, schema = schema)
df.printSchema()

# Example 1
df.withColumnRenamed("dob","DateOfBirth").printSchema()
# Example 2   
df2 = df.withColumnRenamed("dob","DateOfBirth") \
    .withColumnRenamed("salary","salary_amount")
df2.printSchema()

# Example 3 
schema2 = StructType([
    StructField("fname",StringType()),
    StructField("middlename",StringType()),
    StructField("lname",StringType())])
    
df.select(col("name").cast(schema2),
  col("dob"),
  col("gender"),
  col("salary")) \
    .printSchema()    

# Example 4 
df.select(col("name.firstname").alias("fname"),
  col("name.middlename").alias("mname"),
  col("name.lastname").alias("lname"),
  col("dob"),col("gender"),col("salary")) \
  .printSchema()
  
# Example 5
df4 = df.withColumn("fname",col("name.firstname")) \
      .withColumn("mname",col("name.middlename")) \
      .withColumn("lname",col("name.lastname")) \
      .drop("name")
df4.printSchema()

#Example 7
newColumns = ["newCol1","newCol2","newCol3","newCol4"]
df.toDF(*newColumns).printSchema()

## EDA

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Demo').getOrCreate()
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/13 23:56:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [14]:
df = spark.read\
    .option('header',True)\
    .option('inferSchema',True)\
    .csv("./archive/CommentsFeb2018.csv")

                                                                                

In [15]:
df.printSchema()

root
 |-- approveDate: string (nullable = true)
 |-- articleID: string (nullable = true)
 |-- articleWordCount: string (nullable = true)
 |-- commentBody: string (nullable = true)
 |-- commentID: string (nullable = true)
 |-- commentSequence: string (nullable = true)
 |-- commentTitle: string (nullable = true)
 |-- commentType: string (nullable = true)
 |-- createDate: string (nullable = true)
 |-- depth: string (nullable = true)
 |-- editorsSelection: string (nullable = true)
 |-- inReplyTo: string (nullable = true)
 |-- newDesk: string (nullable = true)
 |-- parentID: string (nullable = true)
 |-- parentUserDisplayName: string (nullable = true)
 |-- permID: string (nullable = true)
 |-- picURL: string (nullable = true)
 |-- printPage: string (nullable = true)
 |-- recommendations: string (nullable = true)
 |-- recommendedFlag: string (nullable = true)
 |-- replyCount: string (nullable = true)
 |-- reportAbuseFlag: string (nullable = true)
 |-- sectionName: string (nullable = true)
 |

In [17]:
from pyspark.sql.functions import isnull, when, col, count

print('Rows: ', df.count())
print('Columns: ', len(df.columns))
print('Missing: ', df.select([
    count(when(isnull(c),c)).alias(c) for c in df.columns])\
        .toPandas().sum().sum()
    )
for c in df.columns:
    print(
        '{:15}: '.format(c), df.select(c).distinct().count(), '\t',
        [row[c] for row in df.select(c).distinct().collect()[:5]]
    )

Rows:  215284
Columns:  34
Missing:  951911


                                                                                

approveDate    :  158056 	 ['1517502691', '1517495440', '1517522426', '1517505368', '1517502937']
articleID      :  1157 	 ['5a73b2d710f40f00018bf15f', '5a75e4af10f40f00018bf58a', '5a7452df10f40f00018bf2b7', '5a749c4c10f40f00018bf38f', '5a730c8b10f40f00018bef28']
articleWordCount:  844 	 ['1159', '800', '1372', '591', '1445']
commentBody    :  214384 	 ["Please take a look at photos of the crowds in attendance at women's rights marches-they are NOT entirely young women, in fact, they are a mix of young, middle age, older and include plenty of husbands, fathers and sons. Nobody is marching because it's their last chance to do so w/o being criticized, rather we are marching to ensure the equal rights of all, and sadly today that doesn't include women, but if we keep working and marching and forcing change, it will. In your families case, the caged bird sings but we protestors want to sing in the trees and skies too.", '"Poor Donald, I\'m sure he\'s so very angry with himself now that he 