In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
import sys
import pandas

# (8 cores, 16gb per machine) x 5 = 40 cores

# New API
spark_session = SparkSession\
        .builder\
        .master("local") \
        .appName("Learning")\
        .getOrCreate()

# From this video
# https://youtu.be/_1byVWTEK1s

# Old API (RDD)
spark_context = spark_session.sparkContext
sqlContext = SQLContext(spark_context)

In [None]:
pwd

In [None]:
sqlContext.read.json("/home/ubuntu/LDSA/lab2/Test/zips.json").createOrReplaceTempView("zip")
#sqlContext.read.json("/home/ubuntu/LDSA/lab2/Test/zips.json").registerTempTable("zip")

In [None]:
sqlContext.sql("DESCRIBE zip").show()

In [None]:
sqlContext.table("zip").withColumnRenamed("_id", "zip").createOrReplaceTempView("zip")

In [None]:
sqlContext.sql("SELECT COUNT(zip), SUM(pop), city FROM zip WHERE state = 'IL' GROUP BY city ORDER BY SUM(pop) DESC LIMIT 10").show()

In [None]:
sqlContext.sql("CACHE TABLE zip")

In [None]:
# These are from: http://changhsinlee.com/pyspark-udf/

In [3]:
def square(x):
    return x**2

In [4]:
from pyspark.sql.types import IntegerType
square_udf_int = udf(lambda z: square(z), IntegerType())

In [5]:
# float type output
from pyspark.sql.types import FloatType
square_udf_float = udf(lambda z: square(z), FloatType())

In [6]:
from pyspark.sql.types import ArrayType

def square_list(x):
    return [float(val)**2 for val in x]

square_list_udf = udf(lambda y: square_list(y), ArrayType(FloatType()))

#df.select('integer_arrays', square_list_udf('integer_arrays')).show()


In [None]:
import string

def convert_ascii(number):
    return [number, string.ascii_letters[number]]

convert_ascii(1)

In [None]:
array_schema = StructType([
    StructField('number', IntegerType(), nullable=False),
    StructField('letters', StringType(), nullable=False)
])

spark_convert_ascii = udf(lambda z: convert_ascii(z), array_schema)

#df_ascii = df.select('integers', spark_convert_ascii('integers').alias('ascii_map'))
#df_ascii.show()


In [None]:
import numpy as np

# Example data
d_np = pd.DataFrame({'int_arrays': [[1,2,3], [4,5]]})
df_np = spark.createDataFrame(d_np)
df_np.show()

In [None]:
# From https://docs.databricks.com/spark/latest/spark-sql/udf-in-python.html

In [None]:
def squared(s):
  return s * s
sqlContext.udf.register("squaredWithPython", squared)

In [None]:
from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
sqlContext.udf.register("squaredWithPython", squared, LongType())

In [None]:
sqlContext.range(1, 20).registerTempTable("test")

In [None]:
%sql select id, squaredWithPython(id) as id_squared from test

In [None]:
%sql select * FROM test

In [None]:
from pyspark.sql.functions import udf
squared_udf = udf(squared, LongType())
df = sqlContext.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

In [None]:
#example of key range
from range_key_dict import RangeKeyDict

if _name_ == '_main_':
    range_key_dict = RangeKeyDict({
        (0, 100): 'A',
        (100, 200): 'B',
        (200, 300): 'C',
    })

    # test normal case
    assert range_key_dict[70] == 'A'
    assert range_key_dict[170] == 'B'
    assert range_key_dict[270] == 'C'

    # test case when the number is float
    assert range_key_dict[70.5] == 'A'

    # test case not in the range, with default value
    assert range_key_dict.get(1000, 'D') == 'D'

In [None]:
# From: https://github.com/benblamey/jupyters/blob/master/ben-spark-master/jupyter/Teaching/Lecture2_Spark_DataFrames_Weather_Demo.ipynb

In [None]:
udf_WND_to_WND_SPEED_MS = udf(WND_to_WND_SPEED_MS, StringType())


data_frame_with_wnd_speed = data_frame.withColumn("WND_SPEED_MS",udf_WND_to_WND_SPEED_MS("WND"))

# .filter()

# 9999: missing (with scale factor of 10)
data_frame_with_wnd_speed = data_frame_with_wnd_speed.filter(data_frame_with_wnd_speed['WND_SPEED_MS'] < 999)

#wnd_split = pyspark.sql.functions.split(data_frame['WND'], ',')
#data_frame_with_wnd_speed = data_frame.withColumn('WND_SPEED_MS', wnd_split.getItem(3))

data_frame_with_wnd_speed.select('WND', 'WND_SPEED_MS').show()

data_frame_with_wnd_speed.select('WND_SPEED_MS').summary().show()