# PySpark Notes

Here is the notes for pyspark, it contains the basic operation for pyspark.

### Installation

The installation is a little complicate for rookie. We need to follow steps:
- installing java(jdk8 was preferred, dueing to a known issue of jdk11);
- installing hadoop:
  - downloading hadoop files
  - configuring hadoop, hdfs, yarn;

### Initialization spark

In [2]:
from pyspark.sql import SparkSession

## create SparkContexts
## method 1
spark = SparkSession.builder.getOrCreate()
## method 2
#from pyspark import SparkContext, SparkConf
#conf = SparkConf().setAppName("local").setMaster("master")
#sc = SparkContext(conf=conf)

### Create data from array

#### RDD

There are two data format for Spark: RDD and dataframe. 
RDD was designed for:
- low-level transformation and actions
- unstructured data, such as media streams or streams of text;
- manipulating data with functional programming ;

Else, you should use dataframe.

Note: dataset were merged to dataframe in spark2.0.

In [None]:
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

data = [1, 2, 3, 4, 5]
RDD_data_1 = sc.parallelize(data)
RDD_data_2 = sc.textFile("data.txt")

#### Spark dataframe

In [248]:
import pandas as pd
from pyspark.sql import Row

tmp = spark.createDataFrame([
    Row(a=1, b=3, c=1.0),
    Row(a=1, b=3, c=2.0),
    Row(a=1, b=4, c=3.0),
    Row(a=1, b=4, c=4.0)
], schema = ['a','b','c'])
tmp

DataFrame[a: bigint, b: bigint, c: double]

#### Pandas dataframe

In [136]:
import pyspark.pandas as ps

tmp_1 = ps.DataFrame({'a': [1,2,3,4,5,6], 'b':[7,8,9,10,11,12]})
tmp_1.T

Unnamed: 0,0,1,2,3,4,5
a,1,2,3,4,5,6
b,7,8,9,10,11,12


#### Difference between two dataframe

pandas on Spark overcomes the data limitation, enabling users to work with large datasets by leveraging Spark.

Both multi-threading and Spark SQL Catalyst Optimizer contribute to the optimized performance of pandas on spark

- Users can directly query data via SQL with Spark’s optimized SQL engine, as shown below:

In [None]:
import pandas as pd
import pyspark.pandas as ps
pdf = pd.DataFrame({"a": [1, 3, 5]})  # pandas DataFrame
sdf = spark.createDataFrame(pdf)  # PySpark DataFrame
psdf = sdf.to_pandas_on_spark()  # pandas-on-Spark DataFrame

# Query via SQL
ps.sql("SELECT count(*) as num FROM {psdf}")

- It also supports string interpolation syntax to interact with Python objects naturally:

In [None]:
pred = range(4)

# String interpolation with Python instances
ps.sql("SELECT * from {psdf} WHERE a IN {pred}")

### Read CSV File

In [171]:
df = spark.read.csv(
    'file:///home/test/Desktop/Datasets/Ad DisplayClick Data on Taobao.com/data/ad_feature.csv', 
    header=True)
df.show(3)

+----------+-------+-----------+--------+-----+-----+-------------+
|adgroup_id|cate_id|campaign_id|customer|brand|price|numeric_price|
+----------+-------+-----------+--------+-----+-----+-------------+
|     63133|   6406|      83237|       1|95471|170.0|          170|
|    313401|   6406|      83237|       1|87331|199.0|          199|
|    248909|    392|      83237|       1|32233| 38.0|           38|
+----------+-------+-----------+--------+-----+-----+-------------+
only showing top 3 rows



#### Show first $n$ line of data

In [13]:
df.take(1)

[Row(adgroup_id='63133', cate_id='6406', campaign_id='83237', customer='1', brand='95471', price='170.0')]

#### Show last $n$ line of data

In [14]:
df.tail(1)

[Row(adgroup_id='845337', cate_id='11156', campaign_id='379603', customer='255874', brand='74120', price='279.0')]

#### Show column names

In [7]:
df.columns

['adgroup_id', 'cate_id', 'campaign_id', 'customer', 'brand', 'price']

#### Describe specific column

In [10]:
# method 1
df.select("adgroup_id").describe().show()

# method 2
user_cart_faved.describe("user")

+-------+-----------------+
|summary|       adgroup_id|
+-------+-----------------+
|  count|           846811|
|   mean|         423406.0|
| stddev|244453.4237388931|
|    min|                1|
|    max|            99999|
+-------+-----------------+



### Clean Data

#### Drop NA

In [None]:
df.dropna(how='any', thresh=None).describe().show()

# how : str, optional, 'any' or 'all';
# thresh: int, optional, If specified, drop rows that have less than `thresh` non-null values. 
#         This overwrites the `how` parameter.

#### Drop duplicate

In [267]:
help(df.drop_duplicates)

Help on method dropDuplicates in module pyspark.sql.dataframe:

dropDuplicates(subset=None) method of pyspark.sql.dataframe.DataFrame instance
    :func:`drop_duplicates` is an alias for :func:`dropDuplicates`.
    
    .. versionadded:: 1.4



#### Fill NA

help(df.fillna)

fillna(value, subset=None) method of pyspark.sql.dataframe.DataFrame instance
    Replace null values, alias for ``na.fill()``.
    
    :func:`DataFrame.fillna` and :func:`DataFrameNaFunctions.fill` are aliases of each other.
    
    .. versionadded:: 1.3.1
    
    Parameters
    ----------
    value : int, float, string, bool or dict
        Value to replace null values with.
        If the value is a dict, then `subset` is ignored and `value` must be a mapping
        from column name (string) to replacement value. The replacement value must be
        an int, float, boolean, or string.

### Save Data

In [None]:
df.write.csv('foo.csv', header=True)
df.write.parquet('bar.parquet')
df.write.orc('zoo.orc')

### Data slice and index

DataFrame.select() takes the Column instances that returns another DataFrame.

In [None]:
df["price"]  # more useful

In [12]:
df.select("adgroup_id")

DataFrame[adgroup_id: string]

#### show data type

In [70]:
df.select("adgroup_id").dtypes

[('adgroup_id', 'string')]

#### Changing data type


|data type|range|
| --- | --- |
|Byte|	An integer representing a byte. The range is -128 to 127|
|Short	|An integer representing two bytes. The range is -32768 to 32767|
|Int	|An integer representing 4 bytes. The range is -2147483648 to 2147483647|
|Long	|An integer representing 8 bytes. The range is -9223372036854775808 to 9223372036854775807|
|Float|	represents a 4-byte single-precision floating-point number|
|Double|	represents an 8-byte double-precision floating-point number|
|String	|represents a string value|
|Boolean	|represents the boolean value|

In [96]:
from pyspark.sql.types import IntegerType,BooleanType,DateType

res_col = user_behavior["brand"].cast("int")
user_behavior = user_behavior.withColumn("fav_brand", col = res_col)

[('customer', 'int')]

#### Create new column

In [194]:
df = df.withColumn("numeric_price", df.price.cast("long"))

#### Filter rows

In [20]:
df.filter(df.cate_id == 11156).show(5)

+----------+-------+-----------+--------+------+-----+
|adgroup_id|cate_id|campaign_id|customer| brand|price|
+----------+-------+-----------+--------+------+-----+
|    138953|  11156|     137119|     173|183367| 26.0|
|    467512|  11156|     137119|     173|183367| 49.9|
|    140008|  11156|     395908|    1214| 10677|138.0|
|    238772|  11156|     385820|    1214| 10677|  6.0|
|    237471|  11156|     385820|    1214| 58577|148.0|
+----------+-------+-----------+--------+------+-----+
only showing top 5 rows



### Using built-in functions

In [152]:
df.groupby("cate_id").agg({"brand":"mean","price":"max"}).show(5)



+-------+----------+------------------+
|cate_id|max(price)|        avg(brand)|
+-------+----------+------------------+
|      1|    9999.0| 68535.29129662522|
|     10|     999.0|164078.31118881117|
|   1000|       9.9|138492.57142857142|
|  10000|     180.0|          374851.0|
|  10001|     900.0|165290.77777777778|
+-------+----------+------------------+
only showing top 5 rows



                                                                                

In [102]:
df_with_mean_price = df.groupby("cate_id","brand").avg("numeric_price")
df_with_mean_price.show(5)

+-------+------+------------------+
|cate_id| brand|avg(numeric_price)|
+-------+------+------------------+
|   7214|412664|              76.0|
|  11292| 61561|             700.0|
|   7214|105646|              48.5|
|   6806|132353|175.46666666666667|
|   6426|222775|             743.5|
+-------+------+------------------+
only showing top 5 rows



### Using user defined functions

#### spark dataframe

It only supports operation by row.

In [21]:
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

- **pandas udf**

In [201]:
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

# Execute function as a Spark vectorized UDF
tmp.select(multiply_func(tmp.a, tmp.b)).show(1)

+-------------------+
|multiply_func(a, b)|
+-------------------+
|                  3|
+-------------------+
only showing top 1 row



- **applyInPandas**: 

    The function should take a `pandas.DataFrame` and return another `pandas.DataFrame`. 
    
    For each group, all columns are passed together as a `pandas.DataFrame` to the user-function and the returned `pandas.DataFrame` are combined as a class:`DataFrame`.

In [247]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

def plus_mean(pandas_df):
    return pandas_df.assign(c=pandas_df.c.mean())

tmp.groupby('a','b').applyInPandas(plus_mean, schema=tmp.schema).show(2)

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  3|1.5|
|  1|  3|1.5|
+---+---+---+
only showing top 2 rows



- **mapInPandas**: 

    The function should take an iterator of `pandas.DataFrame` and return another iterator of `pandas.DataFrame`. 
    
    All columns are passed together as an iterator of `pandas.DataFrame` to the function and the returned iterator of `pandas.DataFrame` are combined as a :class:`DataFrame`.

In [252]:
from pyspark.sql.functions import pandas_udf

def filter_func(iterator):
    for pdf in iterator:
        pdf.c = pdf.c + 1
        yield pdf[pdf.a == 1]

tmp.mapInPandas(filter_func, tmp.schema).show() 

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  3|2.0|
|  1|  3|3.0|
|  1|  4|4.0|
|  1|  4|5.0|
+---+---+---+



#### pandas dataframe

- **transform**: must return data with same shape
- **transfrom in batch**: slice the pandas-on-Spark DataFrame or Series, and then applies the given function with pandas DataFrame or Series as input and output.

In [170]:
def pandas_plus(pser):
    return pser + 1  

tmp_1.transform(pandas_plus).T

Unnamed: 0,0,1,2,3,4,5
a,2,3,4,5,6,7
b,8,9,10,11,12,13


In [89]:
def pandas_plus(pser):
    if pser.a[0] == 1:
        pser.c = pser.c + 1
    else:
        pser.c = pser.c - 1
    return pser

tmp.pandas_on_spark.transform_batch(pandas_plus).T

21/12/31 00:34:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/31 00:34:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Unnamed: 0,0,1,2,3,4,5,6,7
a,1.0,1.0,1.0,1.0,2.0,2.0,2.0,2.0
b,3.0,3.0,4.0,4.0,5.0,5.0,6.0,6.0
c,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0


- **apply**: it can return data in any shape
- **apply_batch**: slice the pandas-on-Spark DataFrame or Series, and then applies the given function with pandas DataFrame or Series as input and output.

In [169]:
def pandas_plus(pser):
    return sum(pser)

tmp_1.apply(pandas_plus, axis=0)
tmp_1.pandas_on_spark.apply_batch(pandas_plus).T

a    21
b    57
dtype: int64

### Working with SQL

In [269]:
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|  846811|
+--------+



In addition, UDFs can be registered and invoked in SQL out of the box:

In [None]:
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(v1) FROM tableA").show()

## Pandas on Spark

Spark has full support on pandas API and optimizied on SQL engine.

## Example: User Profile Demo

This is a demo for user profile from the Taobao AD data. We are going to build user profile step by step. And the main user profile we going to build including user value, loyalty and interests.

The user value and loyalty were building by: 
- High/Low Value: Likes buying items which price higher than $P_{avg} + \sigma_{p}$;
- Loyalty/Common User: Frequency more than $Trans_{avg}+\sigma_{trans}$;


The user interests were building by:

- Modeling Labels
    - User interests: Current likes/Long-term likes/Do not likes
    - Interest weights

#### 用户的行为日志behavior_log：用户ID，行为类型，时间，商品类目ID，品牌ID

本数据集涵盖了raw_sample中全部用户22天内的购物行为(共七亿条记录)。字段说明如下：
- (1) user：脱敏过的用户ID；
- (2) time_stamp：时间戳；
- (3) btag：行为类型,  包括以下四种：浏览(pv), 喜欢(fav), 加入购物车(cart), 购买(buy)
- (4) cate：脱敏过的商品类目；
- (5) brand: 脱敏过的品牌词；

#### Computing cate/brand avg price

In [5]:
ad_feature = spark.read.csv(
    'file:///home/test/Desktop/Datasets/Ad DisplayClick Data on Taobao.com/data/ad_feature.csv', 
    header=True)

res_col = ad_feature["price"].cast("float")
ad_feature = ad_feature.withColumn("num_price", col = res_col)
ad_mean = ad_feature.groupby("cate_id","brand").avg("num_price")
ad_mean.show(5)

+-------+------+------------------+
|cate_id| brand|    avg(num_price)|
+-------+------+------------------+
|   7214|412664|              76.0|
|  11292| 61561|             700.0|
|   7214|105646|              48.5|
|   6806|132353|175.46666666666667|
|   6426|222775|             743.5|
+-------+------+------------------+
only showing top 5 rows



#### Computing user pvalue_level(消费档次) by brand fav and cart

In [3]:
import pandas as pd

user_behavior = spark.read.csv(
    'file:///home/test/Desktop/Datasets/Ad DisplayClick Data on Taobao.com/data/behavior_log.csv', 
    header=True)

res_col = user_behavior["brand"].cast("int")
user_behavior = user_behavior.withColumn("fav_brand", col = res_col)
user_cart = user_behavior.filter(user_behavior.btag == "cart")
user_cart = user_cart.select("user","cate", "brand","fav_brand")

## on the specific cate, finding user most carted brand
def compute_freq(pdf):
    pdf_tmp = pd.Series(pdf.brand)
    CountDict = dict(pdf_tmp.value_counts())
    for i in CountDict.keys():
        tmp_index = pdf.brand == i
        pdf.loc[tmp_index,'fav_brand'] = CountDict[i]
    return pdf

#tmp_3 = tmp.groupby("user","cate").applyInPandas(compute_freq, schema=tmp.schema)
user_cart_faved = user_cart.groupby("user","cate").applyInPandas(compute_freq, schema=user_cart.schema)
user_cart_faved = user_cart_faved.dropDuplicates()
user_cart_faved.show(10)

[Stage 3:>                                                          (0 + 1) / 1]

+-------+----+------+---------+
|   user|cate| brand|fav_brand|
+-------+----+------+---------+
|    100|6423|326445|        1|
|1000000|6554|275122|        1|
|1000000|6952|265474|        2|
|1000000|6952|265474|        2|
|1000000|6952|348810|        1|
|1000000|6952|275122|        2|
|1000000|6952|275122|        2|
|1000000|6952| 59455|        2|
|1000000|6952| 59455|        2|
|1000000|7214|275077|        1|
+-------+----+------+---------+
only showing top 10 rows



                                                                                