# Spark SQL API

In [1]:
from pyspark.sql import SparkSession
spark = (SparkSession
     .builder
     .master('local[*]')
     .getOrCreate())

22/02/16 19:20:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
trips = spark.read.csv(
    '/mnt/data/public/nyctaxi/all/green_tripdata_2017-1*.csv',
    sep=',', header=True
)

In [3]:
trips.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- trip_type: string (nullable = true)



In [4]:
tweets = spark.read.json(
    '/mnt/data/public/twitter/sample/data-1909302*.json.bz2'
)

22/02/16 19:20:46 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [5]:
tweets.printSchema()

root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- delete: struct (nullable = true)
 |    |-- status: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- id_str: string (nullable = true)
 |    |    |-- user_id: long (nullable = true)
 |    |    |-- user_id_str: string (nullable = true)
 |    |-- timestamp_ms: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-

Spark data frames can be handled similar to SQL tables. The `select` and `withColumn` methods can add or modify columns by passing Spark SQL functions.

In [6]:
from pyspark.sql.functions import hour, round
(trips.select('RateCodeID', 
              trips.PULocationID.astype('int'),
              trips['DOLocationID'].alias('dropoff'),
              hour('lpep_pickup_datetime'))
      .withColumn('double code', trips['RateCodeID'] * 2)
      .where(trips['RateCodeID'] == 1)
      .limit(10)
      .show())

+----------+------------+-------+--------------------------+-----------+
|RateCodeID|PULocationID|dropoff|hour(lpep_pickup_datetime)|double code|
+----------+------------+-------+--------------------------+-----------+
|         1|         264|    193|                         0|        2.0|
|         1|          92|    230|                         0|        2.0|
|         1|         112|    157|                         0|        2.0|
|         1|         255|     48|                         0|        2.0|
|         1|         260|    129|                         0|        2.0|
|         1|         129|    129|                         0|        2.0|
|         1|          97|     49|                         0|        2.0|
|         1|         255|     37|                         0|        2.0|
|         1|         106|     79|                         0|        2.0|
|         1|           7|    223|                         0|        2.0|
+----------+------------+-------+------------------

## User-defined functions

To use an arbitrary function to process a row or group of columns, it has to be defined as a user-defined function (UDF).

In [7]:
from pyspark.sql.functions import udf

@udf
def pair_code(pu, do): 
    return f'{pu}-{do}'

The UDF accepts columns as parameters whilst the decorated python function will receive the row values of the column parameters.

In [8]:
(trips
     .select(pair_code('PULocationID', 'DOLocationID'))
     .show()
)

[Stage 4:>                                                          (0 + 1) / 1]

+-------------------------------------+
|pair_code(PULocationID, DOLocationID)|
+-------------------------------------+
|                              264-193|
|                               92-230|
|                              112-157|
|                               255-48|
|                              260-129|
|                              129-129|
|                                97-49|
|                               255-37|
|                               106-79|
|                                7-223|
|                                223-7|
|                               49-233|
|                                25-49|
|                                25-33|
|                              179-260|
|                              179-179|
|                              112-112|
|                               112-49|
|                               256-79|
|                              260-198|
+-------------------------------------+
only showing top 20 rows



Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError
                                                                                

The `returnType` of the `udf` decorator can also be specified and the default is `StringType`. To create nested fields, we can create a `StructType`.

In [9]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

UserInfoType = StructType([
    StructField('id', IntegerType()),
    StructField('user', StructType([
        StructField('name', StringType()),
        StructField('description', StringType())
    ]))
])

UserInfoType

StructType(List(StructField(id,IntegerType,true),StructField(user,StructType(List(StructField(name,StringType,true),StructField(description,StringType,true))),true)))

In [10]:
@udf(UserInfoType)
def collect_user_info(user_id, user_name, user_description):
    return (user_id, (user_name, user_description))

In [11]:
(tweets
    .select(collect_user_info('user.id', 'user.name', 'user.description'))
    .show())

+-------------------------------------------------------+
|collect_user_info(user.id, user.name, user.description)|
+-------------------------------------------------------+
|                                   {1394088344, {Un ...|
|                                   {238265400, {Jose...|
|                                   {286588928, {crys...|
|                                   {1683464193, {S1l...|
|                                   {-984289279, {𝕽?...|
|                                 {-1911214079, {みん...|
|                                   {null, {null, null}}|
|                                   {135716865, {Ayom...|
|                                   {1733398962, {Don...|
|                                   {-2053718016, {ma...|
|                                   {1834409990, {كلا...|
|                               {643145728, {怠惰な堕...|
|                                   {254860578, {BW M...|
|                                   {19426342, {Zeno ...|
|                    

[Stage 5:>                                                          (0 + 1) / 1]                                                                                

We can also make the UDF work directly with pandas Series and DataFrames by making it a pandas UDF. However, the relevant column, group or dataframe will be passed to the executor so beware of possible out of memory errors.

In [12]:
import numpy as np
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf(returnType=StringType())
def bin_values(s: pd.Series) -> pd.Series:
    return pd.cut(s, 10).astype(str)

In [13]:
(trips
    .select(bin_values(trips.trip_distance.astype('float')).alias('bin'))
    .show())

[Stage 6:>                                                          (0 + 1) / 1]

+----------------+
|             bin|
+----------------+
|(-0.0528, 5.283]|
|(10.566, 15.849]|
|(-0.0528, 5.283]|
| (5.283, 10.566]|
|(-0.0528, 5.283]|
|(-0.0528, 5.283]|
|(-0.0528, 5.283]|
|(-0.0528, 5.283]|
| (5.283, 10.566]|
|(-0.0528, 5.283]|
|(-0.0528, 5.283]|
| (5.283, 10.566]|
|(-0.0528, 5.283]|
|(-0.0528, 5.283]|
|(-0.0528, 5.283]|
|(-0.0528, 5.283]|
|(-0.0528, 5.283]|
|(-0.0528, 5.283]|
|(-0.0528, 5.283]|
| (5.283, 10.566]|
+----------------+
only showing top 20 rows



                                                                                

We may also call a Python function directly to work with the entire data or group as a pandas DataFrame. However, this method is experimental and is new in Spark 3. It also requires a shuffle and all the data of a group should fit on memory.

In [14]:
def standardize_fares(s):
    return pd.DataFrame({'normed_fare': s['fare_amount'].astype(float) /
                         s['fare_amount'].astype(float).max()})

In [15]:
(trips
   .groupby('RatecodeID')
   .applyInPandas(standardize_fares, schema="normed_fare FLOAT")
   .show()
)

                                                                                

+-----------+
|normed_fare|
+-----------+
| 0.03984064|
|  0.1503984|
|-0.03984064|
| 0.03984064|
| 0.03984064|
|0.052788846|
| 0.12051793|
| 0.17629482|
| 0.16633466|
| 0.19223107|
|   0.187251|
| 0.03984064|
| 0.17131475|
| 0.18326694|
| 0.13147411|
| 0.17729084|
| 0.03984064|
| 0.03984064|
| 0.03984064|
| 0.13446215|
+-----------+
only showing top 20 rows





**Problem 1**

Create a function `get_a_counter_udf` that returns a UDF. The UDF accepts a string column and returns the number of times, as an integer, the letter `a` is found in the column in a case-sensitive fashion.

In [16]:
def get_a_counter_udf():
    from pyspark.sql.functions import udf
    from pyspark.sql.types import IntegerType
    import re
    return udf(lambda x: len(re.findall('a', x, re.I)) if x is not None else 0, returnType=IntegerType())

In [17]:
from numpy.testing import assert_equal

a_counter_udf = get_a_counter_udf()
pdf_a_counter = (
    tweets
     .select('id', a_counter_udf('text').alias('a_count'))
     .orderBy(tweets.id.asc_nulls_last())
     .limit(20)
     .toPandas()
)
assert_equal(
    pdf_a_counter.head(10).to_numpy().tolist(),
    [[1178762246899544069, 1],
     [1178762251077062656, 6],
     [1178762251077128199, 2],
     [1178762251077132289, 2],
     [1178762251081146368, 1],
     [1178762251081322500, 1],
     [1178762251085451271, 0],
     [1178762251085463557, 17],
     [1178762251085500417, 10],
     [1178762251085500418, 2]]
)

                                                                                

**Problem 2**

Create a function `get_zscore_udf` that returns a pandas UDF. The UDF accepts a float column and returns the $z$-score.

In [18]:
def get_zscore_udf():
    from pyspark.sql.functions import pandas_udf
    from pyspark.sql.types import FloatType
    return pandas_udf(lambda x: (x-x.mean())/x.std(), returnType=FloatType())

In [19]:
from numpy.testing import assert_almost_equal

zscore_udf = get_zscore_udf()
pdf_zscore = (
    trips
        .select(zscore_udf(trips.fare_amount.astype('float')).alias('fare'))
        .orderBy(trips.lpep_pickup_datetime, trips.lpep_dropoff_datetime)
        .limit(10)
        .toPandas()
)
assert_almost_equal(
    pdf_zscore.iloc[:5].to_numpy().tolist(),
    [[0.008557479828596115],
     [-0.6729615330696106],
     [0.04841563478112221],
     [-0.18859802186489105],
     [-0.6502535939216614]]
)

                                                                                

**Problem 3**

Create a function `get_fare_udf` that returns a pandas UDF. The UDF accepts `fare_amount`, `extra`, `mta_tax`, `tip_amount`, `tolls_amount`, `ehail_fee`, `improvement_surcharge` and `total_amount`, and returns a struct with float fields corresponding to the input parameters.

In [20]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

UserInfoType = StructType([
    StructField('id', IntegerType()),
    StructField('user', StructType([
        StructField('name', StringType()),
        StructField('description', StringType())
    ]))
])

UserInfoType

StructType(List(StructField(id,IntegerType,true),StructField(user,StructType(List(StructField(name,StringType,true),StructField(description,StringType,true))),true)))

In [21]:
def get_fare_udf():
    from pyspark.sql.types import FloatType
    from pyspark.sql.functions import pandas_udf, udf
    struc = StructType([
        StructField('fare_amount', FloatType()),
        StructField('extra', FloatType()),
        StructField('mta_tax', FloatType()),
        StructField('tip_amount', FloatType()),
        StructField('tolls_amount', FloatType()),
        StructField('ehail_fee', FloatType()),
        StructField('improvement_surcharge', FloatType()),
        StructField('total_amount', FloatType())
    ])
   
    return udf(lambda fa, ex, mt, ti, to, eh, im , tot: (fa, ex, mt, ti, to, eh, im, tot), struc)

In [22]:
from pyspark.sql.types import FloatType
fare_udf = get_fare_udf()
df_fare = (
    trips
        .orderBy(trips.lpep_pickup_datetime, trips.lpep_dropoff_datetime)
        .select(
            fare_udf(
                trips.fare_amount.astype('float'),
                trips.extra.astype('float'),
                trips.mta_tax.astype('float'),
                trips.tip_amount.astype('float'),
                trips.tolls_amount.astype('float'),
                trips.ehail_fee.astype('float'),
                trips.improvement_surcharge.astype('float'),
                trips.total_amount.astype('float')).alias('fare'))
        .limit(10)
)
assert_equal(
    df_fare.schema,
    StructType([
        StructField('fare',
            StructType([
                StructField('fare_amount',FloatType(), True),
                StructField('extra',FloatType(), True),
                StructField('mta_tax',FloatType(), True),
                StructField('tip_amount',FloatType(), True),
                StructField('tolls_amount',FloatType(), True),
                StructField('ehail_fee',FloatType(), True),
                StructField('improvement_surcharge',FloatType(), True),
                StructField('total_amount',FloatType(), True)
            ]),
        True)
    ])
)
pdf_fare = df_fare.toPandas()
assert_equal(
    pdf_fare.iloc[:5].to_numpy().squeeze().tolist(),
    [[11.0, 1.0, 0.5, 0.0, 0.0, None, 0.30000001192092896,
      12.800000190734863],
     [1.100000023841858, 0.0, 0.5, 0.0, 0.0, None, 0.30000001192092896,
      1.899999976158142],
     [11.5, 0.5, 0.5, 0.0, 0.0, None, 0.30000001192092896,
      12.800000190734863],
     [10.5, 0.0, 0.5, 0.0, 0.0, None, 0.30000001192092896,
      11.300000190734863],
     [5.5, 0.0, 0.5, 0.0, 0.0, None, 0.30000001192092896, 6.300000190734863]]
)

  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError
                                                                                

## SQL statements

You can also call SQL statements to process a dataframe. To do so, a dataframe has to be added to the catalog first.

In [23]:
trips.createOrReplaceTempView('trips')

In [24]:
tweets.createOrReplaceTempView('tweets')

In [25]:
(spark.sql(
    """
SELECT
    RateCodeID,
    CAST(PULocationID as INT),
    DOLocationID as dropoff,
    HOUR(lpep_pickup_datetime)
FROM trips
WHERE RateCodeID == 1
LIMIT 10
    """)
    .withColumn('double code', trips['RateCodeID'] * 2)
    .show())

+----------+------------+-------+---------------------------------------------+-----------+
|RateCodeID|PULocationID|dropoff|hour(CAST(lpep_pickup_datetime AS TIMESTAMP))|double code|
+----------+------------+-------+---------------------------------------------+-----------+
|         1|         264|    193|                                            0|        2.0|
|         1|          92|    230|                                            0|        2.0|
|         1|         112|    157|                                            0|        2.0|
|         1|         255|     48|                                            0|        2.0|
|         1|         260|    129|                                            0|        2.0|
|         1|         129|    129|                                            0|        2.0|
|         1|          97|     49|                                            0|        2.0|
|         1|         255|     37|                                            0| 

In [26]:
(spark.sql(
    """
    SELECT
        RateCodeID,
        CAST(PULocationID as INT),
        DOLocationID as dropoff,
        HOUR(lpep_pickup_datetime)
    FROM trips
    WHERE RateCodeID == 1
    LIMIT 10
    """)
    .withColumn('double code', trips['RateCodeID'] * 2)
    .show())

+----------+------------+-------+---------------------------------------------+-----------+
|RateCodeID|PULocationID|dropoff|hour(CAST(lpep_pickup_datetime AS TIMESTAMP))|double code|
+----------+------------+-------+---------------------------------------------+-----------+
|         1|         264|    193|                                            0|        2.0|
|         1|          92|    230|                                            0|        2.0|
|         1|         112|    157|                                            0|        2.0|
|         1|         255|     48|                                            0|        2.0|
|         1|         260|    129|                                            0|        2.0|
|         1|         129|    129|                                            0|        2.0|
|         1|          97|     49|                                            0|        2.0|
|         1|         255|     37|                                            0| 

The execution plan can be visualized using `explain`. This can be done for all Spark DataFrames not just for those that use a SQL statement.

In [27]:
(spark.sql(
    """
SELECT
    RateCodeID,
    CAST(PULocationID as INT),
    DOLocationID as dropoff,
    HOUR(lpep_pickup_datetime)
FROM trips
WHERE RateCodeID == 1
LIMIT 10
    """)
    .withColumn('double code', trips['RateCodeID'] * 2)
    .explain(mode='formatted'))

== Physical Plan ==
* Project (7)
+- * GlobalLimit (6)
   +- Exchange (5)
      +- * LocalLimit (4)
         +- * Project (3)
            +- * Filter (2)
               +- Scan csv  (1)


(1) Scan csv 
Output [4]: [lpep_pickup_datetime#17, RatecodeID#20, PULocationID#21, DOLocationID#22]
Batched: false
Location: InMemoryFileIndex [file:/mnt/data/public/nyctaxi/all/green_tripdata_2017-10.csv, ... 2 entries]
PushedFilters: [IsNotNull(RatecodeID)]
ReadSchema: struct<lpep_pickup_datetime:string,RatecodeID:string,PULocationID:string,DOLocationID:string>

(2) Filter [codegen id : 1]
Input [4]: [lpep_pickup_datetime#17, RatecodeID#20, PULocationID#21, DOLocationID#22]
Condition : (isnotnull(RateCodeID#20) AND (cast(RateCodeID#20 as int) = 1))

(3) Project [codegen id : 1]
Output [4]: [RateCodeID#20, cast(PULocationID#21 as int) AS PULocationID#362, DOLocationID#22 AS dropoff#361, hour(cast(lpep_pickup_datetime#17 as timestamp), Some(Asia/Manila)) AS hour(CAST(lpep_pickup_datetime AS TIMESTAMP

UDFs must be registered first before it can be used in a SQL statement.

In [28]:
spark.udf.register("pair_code", pair_code);

In [29]:
spark.sql("""
SELECT pair_code(PULocationID, DOLocationID)
FROM trips
LIMIT 10
""").show()

+-------------------------------------+
|pair_code(PULocationID, DOLocationID)|
+-------------------------------------+
|                              264-193|
|                               92-230|
|                              112-157|
|                               255-48|
|                              260-129|
|                              129-129|
|                                97-49|
|                               255-37|
|                               106-79|
|                                7-223|
+-------------------------------------+



Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


In [30]:
spark.udf.register('double_distance', lambda x: x*2, 'FLOAT');

In [31]:
spark.sql("""
SELECT double_distance(CAST(trip_distance AS FLOAT))
FROM trips
LIMIT 10
""").show()

+---------------------------------------------+
|double_distance(cast(trip_distance as float))|
+---------------------------------------------+
|                                          0.0|
|                                        27.96|
|                                         3.22|
|                                        14.46|
|                                         4.62|
|                                          2.6|
|                                         1.78|
|                                          3.9|
|                                         11.4|
|                                          3.4|
+---------------------------------------------+



Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/us

In [32]:
spark.sql("""
SELECT
    double_distance(CAST(trip_distance AS FLOAT)) AS dd,
    (CASE
        WHEN
            double_distance(CAST(trip_distance AS FLOAT)) > 10
        THEN 
            'long'
        ELSE
            'short'
    END) AS trip_type
FROM trips
LIMIT 10
""").show()

+-----+---------+
|   dd|trip_type|
+-----+---------+
|  0.0|    short|
|27.96|     long|
| 3.22|    short|
|14.46|     long|
| 4.62|    short|
|  2.6|    short|
| 1.78|    short|
|  3.9|    short|
| 11.4|     long|
|  3.4|    short|
+-----+---------+



Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


In [33]:
spark.udf.register('bin_values', bin_values)

<function __main__.bin_values(s: pandas.core.series.Series) -> pandas.core.series.Series>

In [34]:
spark.sql("""
SELECT bin_values(CAST(trip_distance AS FLOAT))
FROM trips
LIMIT 10
""").show()

+----------------------------------------+
|bin_values(cast(trip_distance as float))|
+----------------------------------------+
|                        (-0.0528, 5.283]|
|                        (10.566, 15.849]|
|                        (-0.0528, 5.283]|
|                         (5.283, 10.566]|
|                        (-0.0528, 5.283]|
|                        (-0.0528, 5.283]|
|                        (-0.0528, 5.283]|
|                        (-0.0528, 5.283]|
|                         (5.283, 10.566]|
|                        (-0.0528, 5.283]|
+----------------------------------------+



**Problem 4**

Create a function `ave_amount` that returns a SQL statement that will process `tweets` to create a table with columns `PULocationID` and `mean fare amount`, which contains the average fare amount per `PULocationID`.  Sort by `PULocationID`.

In [35]:
def ave_amount():
    statement = f"""
    
    SELECT CAST(PULocationID AS INT), AVG(fare_amount) AS `mean fare amount`
    FROM trips
    GROUP BY PULocationID 
    ORDER BY PULocationID 
    """
    return statement

In [36]:
from numpy.testing import assert_array_almost_equal

mean_fare = spark.sql(ave_amount()).toPandas()

assert_equal(len(mean_fare), 255)
assert_equal(mean_fare.columns.tolist(), ['PULocationID', 'mean fare amount'])
assert_equal(mean_fare['PULocationID'].to_numpy()[:10], 
                   [1, 2, 3, 5, 6, 7, 8, 9, 10, 11])
assert_array_almost_equal(
    mean_fare['mean fare amount'].to_numpy()[:10],
    [72.33266667, 61.5       , 17.55557656, 42.83333333, 35.77777778,
     10.08359857, 13.625     , 24.45208845, 19.82410309, 21.73911883])

                                                                                

**Problem 5**

Create a function `pickup` that returns a SQL statement to process `trips` into a data frame with columns `pickup_hour` and `pickup_location`. The column `pickup_hour` is of `StringType` and corresponds to `lpep_pickup_datetime` rounded down to the nearest hour and the column `pickup_location` is the array of unique `PULocationID` for that `pickup_hour` sorted by `PULocationID`. Sort the resulting data frame by `pickup_hour`. 

In [37]:
def pickup():
    statement = """
    
    SELECT 
        CAST(
            date_trunc('Hour', to_timestamp(lpep_pickup_datetime)) as string
            ) as pickup_hour,
        sort_array(collect_set(CAST(PULocationID as INT))) as pickup_locations
    FROM trips
    GROUP BY pickup_hour
    ORDER BY pickup_hour
    """
    return statement

In [38]:
pdf_pickup = spark.sql(pickup()).toPandas()
assert_equal(len(pdf_pickup), 2286)
assert_equal(pdf_pickup.columns.tolist(), ['pickup_hour', 'pickup_locations'])
assert_equal(
    pdf_pickup.iloc[:10].to_numpy().tolist(),
    [['2008-12-31 18:00:00', [37]],
     ['2008-12-31 23:00:00', [95, 181, 264]],
     ['2009-01-01 00:00:00', [95, 181, 193, 260]],
     ['2009-01-01 01:00:00', [7, 260]],
     ['2009-01-01 02:00:00', [260]],
     ['2009-01-01 05:00:00', [181]],
     ['2009-01-01 07:00:00', [181]],
     ['2009-01-01 08:00:00', [181]],
     ['2009-01-01 09:00:00', [85, 181]],
     ['2009-01-01 10:00:00', [181]]]
)

                                                                                

**Problem 6**

Create a function `friendliest` that returns a SQL statement to process `tweets` into a data frame with columns `screen_name` and `friends_count` and rows corresponding to the ten unique `screen_name`s with the most `friends_count`. Sort by decreasing `friends_count` then by `screen_name`. Your code should be a single continuous Spark DataFrame method call chain.

In [39]:
def friendliest():
    statement = """
    
    SELECT user.screen_name as screen_name, CAST(MAX(user.friends_count) AS INT) AS friends_count
    FROM tweets
    GROUP BY user.screen_name
    ORDER BY friends_count DESC
    LIMIT 10
    
    """
    return statement

In [40]:
flist = spark.sql(friendliest()).toPandas()
assert_equal(flist.columns.tolist(), ['screen_name', 'friends_count'])
assert_equal(
    flist.iloc[:5].to_numpy().tolist(),
    [['8W88W', 1102946],
     ['amola12200', 832745],
     ['abl911', 513701],
     ['jilevin', 504085],
     ['p6_96', 387465]]
)

                                                                                

**Problem 7**

Create a function `hashiest` that returns a SQL statement to process `tweets` into a data frame with columns `hashtag` and `count`, and rows corresponding to the ten most common hashtags. Sort by decreasing frequency then by hashtag. 

In [41]:
def hashiest():
    statement = """
    
    SELECT H as hashtag, COUNT(H) as `count`
    FROM    (SELECT EXPLODE(entities.hashtags.text) as H
            FROM tweets
            WHERE SIZE(entities.hashtags) != 0
                AND ISNOTNULL(entities.hashtags)
            )
    GROUP BY H
    ORDER BY `count` DESC, H
    LIMIT 10
    
    """
    return statement

In [42]:
hlist = spark.sql(hashiest()).toPandas()
assert_equal(hlist.columns.tolist(), ['hashtag', 'count'])
assert_equal(
    hlist.iloc[:5].to_numpy().tolist(),
    [['PCAs', 413],
     ['MPN', 396],
     ['BTS', 248],
     ['MUNARS', 171],
     ['ARMYSelcaDay', 159]]
)

                                                                                

**Problem 8**

Create a function `count_tokens` that returns a SQL statement to process `texts`, a Spark DataFrame of filepaths and text, into a data frame with columns `id` and `tokens` sorted by `id`. The `id` column corresponds to the id of the book and the `value` column corresponds to the number of tokens in each book, including headers and footers. A token is defined as a sequence of non-whitespace characters.

In [43]:
def count_tokens():
    spark.udf.register('count_tokens', lambda x: len(x.split()), 'INT')
    statement = """
    
    SELECT regexp_extract(filenames, '/([0-9]+).txt', 1) as id, count_tokens(value) as tokens
    
    FROM (
        SELECT INPUT_FILE_NAME() as filenames, value
        FROM texts
        )
    ORDER BY id
    
    """
    return statement

In [44]:
(spark.read.text(
    '/mnt/data/public/gutenberg/1/1/1/*/*/?????.txt', wholetext=True)
    .createOrReplaceTempView('texts'))
pdf_token_counts = spark.sql(count_tokens()).toPandas()
assert_equal(pdf_token_counts.columns.tolist(), ['id', 'tokens'])
assert_equal(
    pdf_token_counts.iloc[:10].to_numpy().tolist(),
    [['11100', 243729],
     ['11101', 36479],
     ['11102', 44876],
     ['11103', 89587],
     ['11104', 53843],
     ['11105', 8506],
     ['11106', 115056],
     ['11107', 14580],
     ['11108', 35843],
     ['11109', 16937]]
)

                                                                                

**Problem 9**

Create a function `count_love` that returns a SQL statement to process `texts`, a Spark DataFrame of strings, into a data frame with columns `has love` and `count`. The column `has love` should have row values `loveful` and `loveless`, in that order. The corresponding `count` value is the number of rows that has the word `love` (case-insensitive, bounded by non-word characters), and the words that doesn't have it, respectively.

In [45]:
def count_love():
    import re
    spark.udf.register('love_counter', lambda x: 'loveful' if re.search(r'\blove\b', x, re.I) else 'loveless')
    statement = """
    
    SELECT label as `has love`, COUNT(label) as `count`
    FROM    (
            SELECT love_counter(value) as label
            FROM texts
            )
    GROUP BY label
    
    """
    return statement

In [46]:
# def count_love():
#     def love_counter(row):
#         if re.search(r'\blove\b', row, re.I):
#             return 'loveful'
#         else:
#             return 'loveless'
        
#     spark.udf.register('love_counter', love_counter)

#     statement ="""
#     SELECT love_counter(value) as label
#     FROM texts
#     limit(2)
#     """
#     return statement

In [47]:
(spark.read.text('/mnt/data/public/gutenberg/1/1/1/*/*/?????.txt')
      .createOrReplaceTempView('texts'))
pdf_love = spark.sql(count_love()).toPandas()
assert_equal(
    pdf_love.columns.tolist(),
    ['has love', 'count']
)
assert_equal(
    pdf_love['has love'].tolist(),
    ['loveful', 'loveless']
)

                                                                                

**Problem 10**

Create a function `count_creation` that returns a SQL statement to process `tweets2` into a data frame with columns `year` corresponding to the creation year of users and `frequency` corresponding to the number of unique users with that creation year. Sort them by decreasing number of users.

In [48]:
def count_creation():
    
    statement = """
    
    SELECT INT(`year`), SIZE(collect_set(id)) as `frequency`
    FROM    (
        SELECT regexp_extract(user.created_at, '([0-9]{4})$', 1) as `year`, user.id as `id`
        FROM tweets2
        WHERE isNotNull(regexp_extract(user.created_at, '([0-9]{4})$', 1))
            AND isNotNull(user.id)
            )
    GROUP BY `year`
    ORDER BY `frequency` DESC

    """
    return statement

In [49]:
# SELECT regexp_extract(user.created_at, '([0-9]{4})$', 1) as `year`, 
#     FROM tweets2
#     WHERE isNotNull(regexp_extract(user.created_at, '([0-9]{4})$', 1))
#     LIMIT 10

In [51]:
(spark.read.json('/mnt/data/public/twitter/sample/'
                 'data-1909302*.json.bz2')
      .createOrReplaceTempView('tweets2'))
create_counts = spark.sql(count_creation()).toPandas()

assert_equal(create_counts.shape, (14, 2))
assert_equal(create_counts.columns.tolist(), ['year', 'frequency'])

                                                                                