# Quickstart: DataFrame
From https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html

In [2]:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/15 15:11:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [8]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df   

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [13]:
pandas_df = pd.DataFrame({
 'a': [1, 2, 3],
 'b': [2., 3., 4.],
 'c': ['string1', 'string2', 'string3'],
 'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)]   
})
df = spark.createDataFrame(pandas_df)
df.show(1)

+---+---+-------+----------+
|  a|  b|      c|         d|
+---+---+-------+----------+
|  1|2.0|string1|2000-01-01|
+---+---+-------+----------+
only showing top 1 row



In [16]:
# Set up eager eval of df (No need to call df.show() to see the result)
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

a,b,c,d
1,2.0,string1,2000-01-01
2,3.0,string2,2000-02-01
3,4.0,string3,2000-03-01


In [18]:
df.show(1, vertical=True)

-RECORD 0---------
 a   | 1          
 b   | 2.0        
 c   | string1    
 d   | 2000-01-01 
only showing top 1 row



In [20]:
df.columns

['a', 'b', 'c', 'd']

In [21]:
df.select('a', 'b').show()

+---+---+
|  a|  b|
+---+---+
|  1|2.0|
|  2|3.0|
|  3|4.0|
+---+---+



In [23]:
df.select("a", "b", "c").describe().show()

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   null|
| stddev|1.0|1.0|   null|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+



In [24]:
type(df)

pyspark.sql.dataframe.DataFrame

In [25]:
spark.read

<pyspark.sql.readwriter.DataFrameReader at 0x11f5aa190>

In [27]:
# Get remote data (collects) to local. Can throw error if out of memory
df.collect()

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1)),
 Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1))]

In [28]:
df.take(2)

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1))]

In [30]:
# to pandas conversion. Indepent objects
print(type(df))
print(type(df.toPandas()))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pandas.core.frame.DataFrame'>


In [32]:
# Lazy eval
df.a

# Most operations return columns (so Column just represent a Columnar operation)
from pyspark.sql import Column
from pyspark.sql.functions import upper

type(df.c) == type(upper(df.c)) == type(df.c.isNull())

True

In [33]:
type(df.c)

pyspark.sql.column.Column

In [34]:
df.select(df.c) # Shows data only because of setting  spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

c
string1
string2
string3


In [35]:
df.select(upper(df.c))

upper(c)
STRING1
STRING2
STRING3


In [36]:
type(upper(df.c))

pyspark.sql.column.Column

In [38]:
help(upper) # Upper is a spark func... added in env py Session call???

Help on function upper in module pyspark.sql.functions:

upper(col: 'ColumnOrName') -> pyspark.sql.column.Column
    Converts a string expression to upper case.
    
    .. versionadded:: 1.5



In [40]:
# Works a bit like pandas
df.filter(df.a > 1).show()

+---+---+-------+----------+
|  a|  b|      c|         d|
+---+---+-------+----------+
|  2|3.0|string2|2000-02-01|
|  3|4.0|string3|2000-03-01|
+---+---+-------+----------+



In [42]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1

# Support pandas ufuncs to do a .mapping
df.select(pandas_plus_one(df.a)).show()

+------------------+
|pandas_plus_one(a)|
+------------------+
|                 2|
|                 3|
|                 4|
+------------------+



In [45]:
# Takes in an Iterator on the lines, spits out another iterator of lines
def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 1]

# Need to tell schema because pandas has not clue what we return in yield...
df.mapInPandas(pandas_filter_func, schema=df.schema).show()

+---+---+-------+----------+
|  a|  b|      c|         d|
+---+---+-------+----------+
|  1|2.0|string1|2000-01-01|
+---+---+-------+----------+



In [47]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [48]:
df.groupby('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



In [50]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
+-----+------+---+---+



In [58]:
df.write.mode("overwrite").csv('foo.csv', header=True)
spark.read.csv('foo.csv', header=True).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  6| 60|
| blue|banana|  2| 20|
|  red|banana|  7| 70|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|  red|banana|  1| 10|
|  red|carrot|  3| 30|
|  red| grape|  8| 80|
+-----+------+---+---+



In [59]:
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|       8|
+--------+



In [60]:
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(v1) FROM tableA").take(3)

23/01/15 15:51:48 WARN SimpleFunctionRegistry: The function add_one replaced a previously registered function.


                                                                                

[Row(add_one(v1)=2), Row(add_one(v1)=3), Row(add_one(v1)=4)]

In [61]:
from pyspark.sql.functions import expr

df.selectExpr('add_one(v1)').show()
df.select(expr('count(*)') > 0).show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+

+--------------+
|(count(1) > 0)|
+--------------+
|          true|
+--------------+



23/01/15 16:37:16 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 920023 ms exceeds timeout 120000 ms
23/01/15 16:37:16 WARN SparkContext: Killing executors is not supported by current scheduler.
