In [1]:
from datetime import datetime, date
from pyspark.sql import SparkSession, Row, functions as F
from time import perf_counter
import pandas as pd
import logging

# type hint thingies
from typing import Iterator

In [2]:
# Spark session & context

# create it in local mode
spark = SparkSession.builder.master("local").getOrCreate()
sc = spark.sparkContext
spark.conf.set('spark.sql.repl.eagerEval.enabled', False)

In [3]:
# Sum of the first 100 whole numbers
rdd = sc.parallelize(range(200 + 1))

start = perf_counter()
print("Sum:", rdd.sum())
print(f"Took {perf_counter()-start}ms")

Sum: 20100
Took 0.9798703830019804ms


In [4]:
# Creates dataframe. No explicit schema specified - will sample row to get schema instead
# Alternatively can pass in `schema` param
# Dataframe can be created from pandas DF as well
df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [5]:
# Pandaas dataframe is created col by col
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [6]:
df.show()
df.printSchema()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [7]:
# eager evaluation - displays like this in jupyter
# spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
# df

In [8]:
# Configure logging - find non-worker logs in docker container
logging.basicConfig(level=logging.INFO, filename="log.txt")
logger = logging.getLogger(__name__)
logger.info("test from outside")

def pandas_filter_func(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    logger.info("test from func") # not sure how to see these
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 1]

df.mapInPandas(pandas_filter_func, schema=df.schema).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



In [9]:
sc.uiWebUrl

'http://b622381e56b9:4040'

In [10]:
df2 = spark.createDataFrame([
    Row('don','baltimore',12),
    Row('jerry','boston',19),
    Row('bob','baltimore',99),
    Row('cameron','baltimore',13),
    Row('james','seattle',1),
    Row('peter','seattle',2),
], schema = 'name: string, city: string, id: long')

df2

DataFrame[name: string, city: string, id: bigint]

In [11]:
df2.groupBy('city').count().show()

+---------+-----+
|     city|count|
+---------+-----+
|   boston|    1|
|  seattle|    2|
|baltimore|    3|
+---------+-----+



In [12]:
df2.groupby("city").agg({'name': 'collect_set'}).show()

+---------+-------------------+
|     city|  collect_set(name)|
+---------+-------------------+
|   boston|            [jerry]|
|  seattle|     [james, peter]|
|baltimore|[don, cameron, bob]|
+---------+-------------------+



In [13]:
df2.groupby("city").agg(F.collect_set('name')).show()

+---------+-------------------+
|     city|  collect_set(name)|
+---------+-------------------+
|   boston|            [jerry]|
|  seattle|     [james, peter]|
|baltimore|[don, cameron, bob]|
+---------+-------------------+



In [14]:
# df3 = spark.read.csv("./work/data/customers-100000.csv", header=True, inferSchema=True)
df3 = spark.read.csv("./work/data/customers-100.csv", header=True, inferSchema=True)

# df3.show(5)

In [15]:
print("df4 " + str(df3.filter(F.col('First Name').startswith('S')).count()))
print("df5 " + str(df3.filter(F.col('Last Name').startswith('B')).count()))

df4 3
df5 2


In [16]:
df4 = df3.filter(F.col('First Name').startswith('S'))
df5 = df3.filter(F.col('Last Name').startswith('B'))

df4.intersect(df5).show(5)

+-----+-----------+----------+---------+-------+----+-------+-------+-------+-----+-----------------+--------+-------+
|Index|Customer Id|First Name|Last Name|Company|City|Country|Phone 1|Phone 2|Email|Subscription Date|Birthday|Website|
+-----+-----------+----------+---------+-------+----+-------+-------+-------+-----+-----------------+--------+-------+
+-----+-----------+----------+---------+-------+----+-------+-------+-------+-----+-----------------+--------+-------+



In [17]:
df3.cache()
df3.count()
df4 = df3.filter(F.col('First Name').startswith('S'))
df5 = df3.filter(F.col('Last Name').startswith('B'))

df4.intersect(df5).show(5)

+-----+-----------+----------+---------+-------+----+-------+-------+-------+-----+-----------------+--------+-------+
|Index|Customer Id|First Name|Last Name|Company|City|Country|Phone 1|Phone 2|Email|Subscription Date|Birthday|Website|
+-----+-----------+----------+---------+-------+----+-------+-------+-------+-----+-----------------+--------+-------+
+-----+-----------+----------+---------+-------+----+-------+-------+-------+-----+-----------------+--------+-------+



In [18]:
df6 = df3.filter(F.col('First Name').startswith('S') | F.col('First Name').startswith('B'))
df6.cache()
df4 = df6.filter(F.col('First Name').startswith('S'))
df5 = df6.filter(F.col('Last Name').startswith('B'))

df4.intersect(df5).show(5)

+-----+-----------+----------+---------+-------+----+-------+-------+-------+-----+-----------------+--------+-------+
|Index|Customer Id|First Name|Last Name|Company|City|Country|Phone 1|Phone 2|Email|Subscription Date|Birthday|Website|
+-----+-----------+----------+---------+-------+----+-------+-------+-------+-----+-----------------+--------+-------+
+-----+-----------+----------+---------+-------+----+-------+-------+-------+-----+-----------------+--------+-------+



In [19]:
df3 \
    .withColumn('Subscribed', F.datediff(F.current_date(), F.col('Subscription Date')) / 365) \
    .withColumn('Subscribed', F.floor(F.col('Subscribed'))) \
    .groupBy('Subscribed') \
    .count() \
    .orderBy('Subscribed') \
    .show()

+----------+-----+
|Subscribed|count|
+----------+-----+
|         0|   10|
|         1|    6|
|         2|   11|
|         3|   11|
|         4|    8|
|         5|   11|
|         6|   12|
|         7|   12|
|         8|    9|
|         9|   10|
+----------+-----+



In [20]:
df3.groupBy('Last Name').count().orderBy(F.col('count').desc()).show(5)
df3.groupBy('First Name').count().orderBy(F.col('count').desc()).show(5)

+---------+-----+
|Last Name|count|
+---------+-----+
|   Murphy|    4|
|    Ortiz|    2|
| Williams|    2|
|  Sanchez|    2|
|    Smith|    2|
+---------+-----+
only showing top 5 rows

+----------+-----+
|First Name|count|
+----------+-----+
|     James|    3|
|    Daniel|    3|
|    Jeremy|    3|
|   Anthony|    2|
|   Charles|    2|
+----------+-----+
only showing top 5 rows



In [21]:

df3.groupBy('Last Name').count().orderBy(F.col('count').asc()).show(5)
df3.groupBy('First Name').count().orderBy(F.col('count').asc()).show(5)


+---------+-----+
|Last Name|count|
+---------+-----+
|   Grimes|    1|
|   Porter|    1|
| Harrison|    1|
|    Scott|    1|
|    Jones|    1|
+---------+-----+
only showing top 5 rows

+----------+-----+
|First Name|count|
+----------+-----+
|    Nathan|    1|
|   Rebecca|    1|
|   Matthew|    1|
|    Amanda|    1|
|      Gina|    1|
+----------+-----+
only showing top 5 rows



In [24]:
sc.stop()
spark.stop()