In [1]:
from pyspark.sql import SparkSession 

spark = SparkSession.builder.getOrCreate()

In [3]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=1.2, c='stinrg3'),
    Row(a=2, b=1.3, c='string33')
])
df

DataFrame[a: bigint, b: double, c: string]

In [4]:
# df with implicit schema
df = spark.createDataFrame([
    Row(a=1, b=1.2, c='stinrg3'),
    Row(a=2, b=1.3, c='string33')
], schema='a long, b double, c string')
df

DataFrame[a: bigint, b: double, c: string]

In [7]:
# create pyspark df from pandas df
pdf = pd.DataFrame({
    'a':[1,2], 
    'b':[1.2,1.4],
    'c':['string3','stinrg3']
})
df = spark.createDataFrame(pdf)
df

DataFrame[a: bigint, b: double, c: string]

In [10]:
# create pyspark df from rdd with tuples
rdd = spark.sparkContext.parallelize([
    (1,1.2,'string3'), 
    (2, 1.3,'stirna')
])

df = spark.createDataFrame(rdd, schema =['a','b', 'c'])
df

DataFrame[a: bigint, b: double, c: string]

In [13]:
df.show()
df.printSchema()

+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|1.2|string3|
|  2|1.3| stirna|
+---+---+-------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)



In [15]:
# df.show(1)
spark.conf.set('spark.sql.repl.eagerEval.enabled', True) # this will make df show
df

a,b,c
1,1.2,string3
2,1.3,stirna


In [16]:
df.show(1, vertical=True)

-RECORD 0------
 a   | 1       
 b   | 1.2     
 c   | string3 
only showing top 1 row



In [18]:
df.columns

['a', 'b', 'c']

In [19]:
# quick summary of df variables
df.select('a','b').describe().show()

+-------+------------------+-------------------+
|summary|                 a|                  b|
+-------+------------------+-------------------+
|  count|                 2|                  2|
|   mean|               1.5|               1.25|
| stddev|0.7071067811865476|0.07071067811865482|
|    min|                 1|                1.2|
|    max|                 2|                1.3|
+-------+------------------+-------------------+



In [22]:
df.collect() # quickly collect data
df.take(1)

[Row(a=1, b=1.2, c='string3')]

In [23]:
df.toPandas() # converts spark dataset back to pandas

Unnamed: 0,a,b,c
0,1,1.2,string3
1,2,1.3,stirna


In [24]:
df.a

Column<'a'>

In [25]:
from pyspark.sql import Column
from pyspark.sql.functions import upper

type(df.c) == type(upper(df.c)) == type(df.c.isNull())

True

In [27]:
df.select(df.c).show() # selects column c in df and show

+-------+
|      c|
+-------+
|string3|
| stirna|
+-------+



In [28]:
df.withColumn('upper_c', upper(df.c)).show() # create new variable with existing
# newcolname, function

+---+---+-------+-------+
|  a|  b|      c|upper_c|
+---+---+-------+-------+
|  1|1.2|string3|STRING3|
|  2|1.3| stirna| STIRNA|
+---+---+-------+-------+



In [30]:
df.filter(df.a==1).show() # quick subset
df.filter(df.b>2)

+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|1.2|string3|
+---+---+-------+



a,b,c


In [35]:
df.filter(df.b>1.2)

a,b,c
2,1.3,stirna


In [36]:
import pandas
from pyspark.sql.functions import pandas_udf

In [37]:
@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # plus 1 by using pandas series
    return series + 1

df.select(pandas_plus_one(df.a)).show()

+------------------+
|pandas_plus_one(a)|
+------------------+
|                 2|
|                 3|
+------------------+



In [38]:
def pandas_filter_funct(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 1]
        
df.mapInPandas(pandas_filter_funct, schema=df.schema).show()

+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|1.2|string3|
+---+---+-------+



In [39]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [40]:
df.groupby('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



In [43]:
df.groupby(['color', 'fruit']).count().show()

+-----+------+-----+
|color| fruit|count|
+-----+------+-----+
|  red|banana|    2|
| blue|banana|    1|
|  red|carrot|    2|
| blue| grape|    1|
|black|carrot|    1|
|  red| grape|    1|
+-----+------+-----+



In [45]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
+-----+------+---+---+



In [48]:
df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ('time', 'id', 'v1'))

df2 = spark.createDataFrame(
    [(20000101, 1, 'x'), (20000101, 2, 'y')],
    ('time', 'id', 'v2'))

In [49]:
def asof_join(l, r):
    return pd.merge_asof(l, r, on='time', by='id')

df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(
asof_join, schema='time int, id int, v1 double, v2 string').show()

+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000102|  1|3.0|  x|
|20000101|  2|2.0|  y|
|20000102|  2|4.0|  y|
+--------+---+---+---+

