In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("QuickStart").getOrCreate()

In [None]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1,1,12,0)),
    Row(a=2, b=3., c='string2', d=date(2001, 2, 2), e=datetime(2001, 1,2,12,0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2002, 1,3,12,0))
])
df

In [None]:
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df

In [None]:
from datetime import date, datetime
import pandas as pd
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
print(pandas_df.dtypes)

pandas_df['d'] = pd.to_datetime(pandas_df['d'])
print(pandas_df.dtypes)
df = spark.createDataFrame(pandas_df)
df

In [None]:
df.show()

In [None]:
print(df.count())
print(df.columns)
print(df.schema)



In [None]:
df.show(1)

In [None]:
spark.conf.set('spark.sql.repl.eagerEval.enabled',True)
df

In [None]:
df.show(1, vertical=True)

In [None]:
df.columns

In [None]:
df.printSchema()

In [None]:
df.select("a","b","c").describe().show()

In [None]:
df.collect()

In [None]:
df.take(1)

In [None]:
df.toPandas()

In [None]:
df.a

In [None]:
df.select(df.c).show()

In [None]:
from pyspark.sql import Column
from pyspark.sql.functions import upper

type(df.c) == type(upper(df.c)) == type(df.c.isNull)

In [None]:
df.select(df.c).show()

In [None]:
df.withColumn('upper_c', upper(df.c)).show()

In [None]:
df.filter(df.a == 1).show()

In [None]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1

df.select(pandas_plus_one(df.a)).show()

In [None]:
def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 1]

df.mapInPandas(pandas_filter_func, schema=df.schema).show()

In [None]:
df = spark.createDataFrame([
    ['red','banana', 1, 10], ['blue','banana', 2,20], ['red','carrot', 3,30],
    ['blue','grape', 4,40], ['red','carrot', 5,50], ['black','carrot', 6,60],
    ['red','banana', 7,70], ['red','grape', 8,80]], schema=['color','fruit','v1','v2'])
df.show()

In [None]:
df.groupby('color').avg().show

In [None]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

## Getting Data In/Out

In [None]:
df.write.csv('foo.csv', header=True)
spark.read.csv('foo.csv', header=True).show()