In [48]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.sql import functions as F

#### 构建一个session

In [11]:
session = SparkSession.builder.master("lcoal").appName("test").config("spark.some.config.option", "some-value").getOrCreate()

#### 读取数据

In [51]:
train_file = '/home/jovyan/work/titanic/train.csv'
df = session.read.csv(train_file,header=True)
df.show(5,truncate=False)

+-----------+--------+------+---------------------------------------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|Name                                               |Sex   |Age|SibSp|Parch|Ticket          |Fare   |Cabin|Embarked|
+-----------+--------+------+---------------------------------------------------+------+---+-----+-----+----------------+-------+-----+--------+
|1          |0       |3     |Braund, Mr. Owen Harris                            |male  |22 |1    |0    |A/5 21171       |7.25   |null |S       |
|2          |1       |1     |Cumings, Mrs. John Bradley (Florence Briggs Thayer)|female|38 |1    |0    |PC 17599        |71.2833|C85  |C       |
|3          |1       |3     |Heikkinen, Miss. Laina                             |female|26 |0    |0    |STON/O2. 3101282|7.925  |null |S       |
|4          |1       |1     |Futrelle, Mrs. Jacques Heath (Lily May Peel)       |female|35 |1    |0    |113803          |53.1   |C

#### pandas 和 spark dataframe 互换

In [54]:
pd_df = df.toPandas()
spark_df = session.createDataFrame(pd_df)

#### 行数统计

In [55]:
df.count()

891

#### 列数统计

In [58]:
len(df.columns)

12

#### 打印列title

In [59]:
df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

#### 打印列数据类型

In [60]:
df.dtypes

[('PassengerId', 'string'),
 ('Survived', 'string'),
 ('Pclass', 'string'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'string'),
 ('SibSp', 'string'),
 ('Parch', 'string'),
 ('Ticket', 'string'),
 ('Fare', 'string'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

#### 打印数据定义

In [61]:
df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



##### 改变数据类型

In [63]:
df = df.withColumn('PassengerId', df['PassengerId'].cast(IntegerType())).\
    withColumn('Survived', df['Survived'].cast(IntegerType())).\
    withColumn('Pclass', df['Pclass'].cast(IntegerType())).\
    withColumn('Age', df['Age'].cast(IntegerType())).\
    withColumn('SibSp', df['SibSp'].cast(IntegerType())).\
    withColumn('Parch', df['Parch'].cast(IntegerType())).\
    withColumn('Fare', df['Fare'].cast(DoubleType()))
df.dtypes

[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'int'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

#### 列操作（增加列）

In [98]:
df.withColumn('FamSize', df['SibSp'] + df['Parch'] + 1).select('SibSp', 'Parch', 'FamSize').show(5, False)

+-----+-----+-------+
|SibSp|Parch|FamSize|
+-----+-----+-------+
|1    |0    |2      |
|1    |0    |2      |
|0    |0    |1      |
|1    |0    |2      |
|0    |0    |1      |
+-----+-----+-------+
only showing top 5 rows



#### 选取指定列

In [70]:
df.select('PassengerId','Survived', 'Name').show(5,False)

+-----------+--------+---------------------------------------------------+
|PassengerId|Survived|Name                                               |
+-----------+--------+---------------------------------------------------+
|1          |0       |Braund, Mr. Owen Harris                            |
|2          |1       |Cumings, Mrs. John Bradley (Florence Briggs Thayer)|
|3          |1       |Heikkinen, Miss. Laina                             |
|4          |1       |Futrelle, Mrs. Jacques Heath (Lily May Peel)       |
|5          |0       |Allen, Mr. William Henry                           |
+-----------+--------+---------------------------------------------------+
only showing top 5 rows



#### 删除指定类

In [75]:
df.drop('Age').show(5,False)

+-----------+--------+------+---------------------------------------------------+------+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|Name                                               |Sex   |SibSp|Parch|Ticket          |Fare   |Cabin|Embarked|
+-----------+--------+------+---------------------------------------------------+------+-----+-----+----------------+-------+-----+--------+
|1          |0       |3     |Braund, Mr. Owen Harris                            |male  |1    |0    |A/5 21171       |7.25   |null |S       |
|2          |1       |1     |Cumings, Mrs. John Bradley (Florence Briggs Thayer)|female|1    |0    |PC 17599        |71.2833|C85  |C       |
|3          |1       |3     |Heikkinen, Miss. Laina                             |female|0    |0    |STON/O2. 3101282|7.925  |null |S       |
|4          |1       |1     |Futrelle, Mrs. Jacques Heath (Lily May Peel)       |female|1    |0    |113803          |53.1   |C123 |S       |
|5          |

#### 条件过滤

In [94]:
test = df.filter((df.Survived ==0) &(df.Sex == 'male')).select('Survived','Sex').show(5, False)

+--------+----+
|Survived|Sex |
+--------+----+
|0       |male|
|0       |male|
|0       |male|
|0       |male|
|0       |male|
+--------+----+
only showing top 5 rows



#### 过滤中的子方法

In [96]:
df.filter(df['Age'].between(22, 24)).select('PassengerId', 'Age').show(5, False)

+-----------+---+
|PassengerId|Age|
+-----------+---+
|1          |22 |
|61         |22 |
|81         |22 |
|89         |23 |
|90         |24 |
+-----------+---+
only showing top 5 rows



In [97]:
df.filter(df['Pclass'].isin([1, 2])).show(5, False)

+-----------+--------+------+---------------------------------------------------+------+---+-----+-----+--------+-------+-----+--------+
|PassengerId|Survived|Pclass|Name                                               |Sex   |Age|SibSp|Parch|Ticket  |Fare   |Cabin|Embarked|
+-----------+--------+------+---------------------------------------------------+------+---+-----+-----+--------+-------+-----+--------+
|2          |1       |1     |Cumings, Mrs. John Bradley (Florence Briggs Thayer)|female|38 |1    |0    |PC 17599|71.2833|C85  |C       |
|4          |1       |1     |Futrelle, Mrs. Jacques Heath (Lily May Peel)       |female|35 |1    |0    |113803  |53.1   |C123 |S       |
|7          |0       |1     |McCarthy, Mr. Timothy J                            |male  |54 |0    |0    |17463   |51.8625|E46  |S       |
|10         |1       |2     |Nasser, Mrs. Nicholas (Adele Achem)                |female|14 |1    |0    |237736  |30.0708|null |C       |
|12         |1       |1     |Bonnell, Mis

#### UDF函数

In [111]:
name_length = udf(lambda s: len(s), IntegerType()) # 默认状态下指定字段为String类型
df.select(name_length(df['Name']).alias('NameLen')).show(5, False)

+-------+
|NameLen|
+-------+
|23     |
|51     |
|22     |
|44     |
|24     |
+-------+
only showing top 5 rows



#### sort 

In [115]:
df.sort(['Age', 'Fare'], ascending=[False, True]).select('Age', 'Fare').show(10, False)

+---+-------+
|Age|Fare   |
+---+-------+
|80 |30.0   |
|74 |7.775  |
|71 |34.6542|
|71 |49.5042|
|70 |7.75   |
|70 |10.5   |
|70 |71.0   |
|66 |10.5   |
|65 |7.75   |
|65 |26.55  |
+---+-------+
only showing top 10 rows



#### drop duplication 

In [122]:
df.select(['Age','Sex']).distinct().show(5,False)
df.select(['Age','Sex']).dropDuplicates().show(5,False)
df.select(['Age','Sex']).drop_duplicates().show(5,False)

+---+------+
|Age|Sex   |
+---+------+
|10 |male  |
|24 |male  |
|70 |male  |
|62 |female|
|38 |female|
+---+------+
only showing top 5 rows

+---+------+
|Age|Sex   |
+---+------+
|10 |male  |
|24 |male  |
|70 |male  |
|62 |female|
|38 |female|
+---+------+
only showing top 5 rows

+---+------+
|Age|Sex   |
+---+------+
|10 |male  |
|24 |male  |
|70 |male  |
|62 |female|
|38 |female|
+---+------+
only showing top 5 rows



#### isnull
null : is nothing
nan : is not a number

In [124]:
df.filter(isnull('Cabin')).select('PassengerId', 'Cabin').show(5, False)

+-----------+-----+
|PassengerId|Cabin|
+-----------+-----+
|1          |null |
|3          |null |
|5          |null |
|6          |null |
|8          |null |
+-----------+-----+
only showing top 5 rows



#### remove null row

In [125]:
df.dropna(subset=['Cabin']).select('PassengerId', 'Cabin').show(5, False)

+-----------+-----+
|PassengerId|Cabin|
+-----------+-----+
|2          |C85  |
|4          |C123 |
|7          |E46  |
|11         |G6   |
|12         |C103 |
+-----------+-----+
only showing top 5 rows



#### 填充缺失值

In [126]:
df.fillna(value={'Cabin':'NaCabin', 'Age':-1}).select('Age', 'Cabin').show(10, False)

+---+-------+
|Age|Cabin  |
+---+-------+
|22 |NaCabin|
|38 |C85    |
|26 |NaCabin|
|35 |C123   |
|35 |NaCabin|
|-1 |NaCabin|
|54 |E46    |
|2  |NaCabin|
|27 |NaCabin|
|14 |NaCabin|
+---+-------+
only showing top 10 rows



####  sampling

In [132]:
df.sample(withReplacement=False, fraction=0.05, seed=2).show(5,False)

+-----------+--------+------+-------------------------------------------------------+------+---+-----+-----+----------+-------+-----+--------+
|PassengerId|Survived|Pclass|Name                                                   |Sex   |Age|SibSp|Parch|Ticket    |Fare   |Cabin|Embarked|
+-----------+--------+------+-------------------------------------------------------+------+---+-----+-----+----------+-------+-----+--------+
|85         |1       |2     |Ilett, Miss. Bertha                                    |female|17 |0    |0    |SO/C 14885|10.5   |null |S       |
|86         |1       |3     |Backstrom, Mrs. Karl Alfred (Maria Mathilda Gustafsson)|female|33 |3    |0    |3101278   |15.85  |null |S       |
|126        |1       |3     |Nicola-Yarred, Master. Elias                           |male  |12 |1    |0    |2651      |11.2417|null |C       |
|130        |0       |3     |Ekstrom, Mr. Johan                                     |male  |45 |0    |0    |347061    |6.975  |null |S       |

#### 分层抽样

In [141]:
test = df.sampleBy(col='Sex', fractions={'male':0.1, 'female':0.2}, seed=2)
print(df.filter(test.Sex == 'male').count())
print(df.filter(test.Sex == 'female').count())
print(test.filter(test.Sex == 'male').count())
print(test.filter(test.Sex == 'female').count())

577
314
60
50


#### 分组 groupby + agg

In [142]:
# count/count distinct
# max/min/sum
# mean/avg
# alias
df.\
    groupBy(['Pclass', 'Survived']).\
    agg(count('Embarked').alias('cnt_embarked'), \
        countDistinct('Embarked').alias('cnt_dist_embarked'), \
        # 巨大なデータを集計する際に、approximateを使った方がいい(精度とパフォマンスのトレードオフ)
        approx_count_distinct('Embarked').alias('app_cnt_dist_embarked'), \
        max('Age').alias('max_age'), \
        avg('Age').alias('avg_age'), \
        # mean('Age').alias('avg_age'), \
       ).\
    show(10, False)

+------+--------+------------+-----------------+---------------------+-------+------------------+
|Pclass|Survived|cnt_embarked|cnt_dist_embarked|app_cnt_dist_embarked|max_age|avg_age           |
+------+--------+------------+-----------------+---------------------+-------+------------------+
|1     |0       |80          |3                |3                    |71     |43.6875           |
|3     |1       |119         |3                |3                    |63     |20.623529411764707|
|1     |1       |134         |3                |3                    |80     |35.36065573770492 |
|2     |1       |87          |3                |3                    |62     |25.867469879518072|
|2     |0       |97          |3                |3                    |70     |33.53333333333333 |
|3     |0       |372         |3                |3                    |74     |26.52962962962963 |
+------+--------+------------+-----------------+---------------------+-------+------------------+



#### Join key名一样

In [147]:
# 新建df
new_df = df.replace(['male', 'female'], ['男性', '女性'], 'Sex').selectExpr('PassengerId', 'Sex AS Sex_JP', 'Fare')
# join
df.selectExpr('PassengerId', 'Sex', 'Fare').join(new_df, ['PassengerId', 'Fare']).show(10, False)

+-----------+-------+------+------+
|PassengerId|Fare   |Sex   |Sex_JP|
+-----------+-------+------+------+
|1          |7.25   |male  |男性    |
|2          |71.2833|female|女性    |
|3          |7.925  |female|女性    |
|4          |53.1   |female|女性    |
|5          |8.05   |male  |男性    |
|6          |8.4583 |male  |男性    |
|7          |51.8625|male  |男性    |
|8          |21.075 |male  |男性    |
|9          |11.1333|female|女性    |
|10         |30.0708|female|女性    |
+-----------+-------+------+------+
only showing top 10 rows



#### join key名不一样

In [151]:
old_df = df.selectExpr('PassengerId AS Old_PassengerId', 'Sex', 'Fare AS Old_Fare')
join_cond = [old_df.Old_PassengerId == new_df.PassengerId, old_df.Old_Fare == new_df.Fare]
old_df.\
    join(new_df, on=join_cond, how='outer').show(10, False)
# how: default ‘inner’. One of inner, outer, left_outer, right_outer, leftsemi

+---------------+------+--------+-----------+------+--------+
|Old_PassengerId|Sex   |Old_Fare|PassengerId|Sex_JP|Fare    |
+---------------+------+--------+-----------+------+--------+
|121            |male  |73.5    |121        |男性    |73.5    |
|644            |male  |56.4958 |644        |男性    |56.4958 |
|655            |female|6.75    |655        |女性    |6.75    |
|690            |female|211.3375|690        |女性    |211.3375|
|18             |male  |13.0    |18         |男性    |13.0    |
|117            |male  |7.75    |117        |男性    |7.75    |
|225            |male  |90.0    |225        |男性    |90.0    |
|347            |female|13.0    |347        |女性    |13.0    |
|568            |female|21.075  |568        |女性    |21.075  |
|716            |male  |7.65    |716        |男性    |7.65    |
+---------------+------+--------+-----------+------+--------+
only showing top 10 rows



#### pivot

In [159]:
df.groupBy('Pclass', 'Survived').\
    pivot('Sex').\
    agg(round(avg('Fare'),2)).\
    show(10, False)

+------+--------+------+-----+
|Pclass|Survived|female|male |
+------+--------+------+-----+
|1     |0       |110.6 |62.89|
|3     |1       |12.46 |15.58|
|1     |1       |105.98|74.64|
|2     |1       |22.29 |21.1 |
|2     |0       |18.25 |19.49|
|3     |0       |19.77 |12.2 |
+------+--------+------+-----+



#### broadcast 快速join
boadcast的目的是数据copy多个副本然后发送给cluster的各个节点进行分布式处理。

In [163]:
df.selectExpr('PassengerId', 'Sex', 'Fare').join(broadcast(new_df), ['PassengerId', 'Fare']).show(10, False)

+-----------+-------+------+------+
|PassengerId|Fare   |Sex   |Sex_JP|
+-----------+-------+------+------+
|1          |7.25   |male  |男性    |
|2          |71.2833|female|女性    |
|3          |7.925  |female|女性    |
|4          |53.1   |female|女性    |
|5          |8.05   |male  |男性    |
|6          |8.4583 |male  |男性    |
|7          |51.8625|male  |男性    |
|8          |21.075 |male  |男性    |
|9          |11.1333|female|女性    |
|10         |30.0708|female|女性    |
+-----------+-------+------+------+
only showing top 10 rows



#### 缓存dataframe cache用法
persist: http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

In [165]:
# 确认是否被cache过
df.is_cached

# cache
df.cache()

# 取消cache
df.unpersist()

# 例子
result_df = df.\
    groupBy(['Pclass', 'Survived']).\
    agg(count('PassengerId').alias('cnt_ID')\
    , approx_count_distinct('PassengerId').alias('app_cnt_dist_ID')\
    ).\
    cache()  

# 在逻辑被第一次执行的时候需要花费一定时间
result_df.show(2, False)
 
# 由于之前的df已经被缓存，所以之后的逻辑会很快被完成
result_df.count()
 
# 在使用结束之后，记得释放内存
result_df.unpersist()

+------+--------+------+---------------+
|Pclass|Survived|cnt_ID|app_cnt_dist_ID|
+------+--------+------+---------------+
|1     |0       |80    |82             |
|3     |1       |119   |120            |
+------+--------+------+---------------+
only showing top 2 rows



DataFrame[Pclass: int, Survived: int, cnt_ID: bigint, app_cnt_dist_ID: bigint]

#### partition

In [175]:
# 查看df被分成几个partition
print(df.rdd.getNumPartitions())
# 重新设定partition
df_repartitioned = df.repartition(10)
print(df_repartitioned.rdd.getNumPartitions())
df_repartitioned.coalesce(1).rdd.getNumPartitions()

1
10


1

#### write

In [188]:
# 输出成csv
df.write.mode('overwrite').csv('df_csv', header=True, sep=';')
# 输出一个csv
df.coalesce(10).write.mode('overwrite').csv('df_csv', header=True, sep=';')

#### 合并结果文件夹中的多个文件为一个文件

In [189]:
def CombineSparkOutput(csv_path, header=None, sep=',', new_csv_name=None, remove_original_file=False):
    """Combine Spark output files into 1 csv file and rename it.
    Parameters
    ----------
    csv_path : string
        path of csv folder.
    header : None or int
        same as pandas.
    sep : string
        same as pandas.
    new_csv_name : string
        file name of new csv file.
    remove_original_file : boolean, default False
        if remove original csv folder or not
    Returns
    -------
     
    """
     
    # 全てのcsvファイル
    all_csv_files = [file for file in os.listdir(path=csv_path) if len(re.findall('csv$', file)) > 0]
    # 空のdataframeを作成
    total_df = pd.read_csv(os.path.join(csv_path, all_csv_files[0]), \
                           header=header, sep=sep, nrows=0)
    # Loopでdataframeを追加
    for csv_file in all_csv_files:
        # 空のcsvファイルを飛ばす
        if os.path.getsize(os.path.join(csv_path, csv_file)) > 0:
            df = pd.read_csv(os.path.join(csv_path, csv_file), \
                             header=header, sep=sep)
            total_df = total_df.append(df)
        else:
            continue
     
    # csvファイルを出力
    total_df.to_csv(new_csv_name, header=header, sep=sep)
    # 過去のファイルを削除
    if remove_original_file:
        shutil.rmtree(csv_path)

In [190]:
import os
import re
import shutil
import pandas as pd
# pyspark输出
df.write.mode('overwrite').csv('df_csv', header=None, sep=';')
# pandas
CombineSparkOutput('df_csv', header=None, sep=';', new_csv_name='df.csv', remove_original_file=True)

#### SQL

In [208]:
# 新建View
df.createOrReplaceTempView("df_view")

# 查看view的内容
session.sql('DESC df_view').show(10, False)

# 现存的table/view
session.sql('SHOW TABLES').show(10, False)

# 删除View
session.catalog.dropTempView('df_view')
# 也可以用query
# session.sql('DROP VIEW IF EXISTS df_view')

# 查看tables
session.sql('SHOW TABLES').show(10, False)



+-----------+---------+-------+
|col_name   |data_type|comment|
+-----------+---------+-------+
|PassengerId|int      |null   |
|Survived   |int      |null   |
|Pclass     |int      |null   |
|Name       |string   |null   |
|Sex        |string   |null   |
|Age        |int      |null   |
|SibSp      |int      |null   |
|Parch      |int      |null   |
|Ticket     |string   |null   |
|Fare       |double   |null   |
+-----------+---------+-------+
only showing top 10 rows

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        |df_view  |true       |
+--------+---------+-----------+

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+



####  database导入

In [None]:
# 新建database
# 需要指定存放data的s3目录
session.sql("CREATE DATABASE IF NOT EXISTS titanic \
COMMENT 'test db' \
LOCATION 'S3/path/to' ")

# 向table中输出数据
df.write.saveAsTable('titanic.test')

# 用parquet速度更快
df_graph\
    .write\
    .format('parquet')\
    .partitionBy('')\
    .mode('overwrite')\
    .saveAsTable( 'titanic.test', Path='S3/path/to' )

#### 版本确认

In [212]:
session.version

'2.3.0'

#### parquet 文件读写

In [None]:
# 写
df.write.mode('overwrite').parquet('df_parquet')
# 读
df2 = spark.read.parquet('df_parquet')

#### 两个dataframe比较差值

In [None]:
df.subtract(df2).show(10, False)