In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("practice").getOrCreate()

24/02/18 21:42:56 WARN Utils: Your hostname, limhaneul-ui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.81 instead (on interface en0)
24/02/18 21:42:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/02/18 21:42:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/02/18 21:42:57 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("retail-data/by-day/2010-12-01.csv")
)
df.printSchema()
df.limit(2).show()

                                                                                

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+



In [3]:
from pyspark.sql.functions import col
from pyspark.sql import functions as F

df.where(col("InvoiceNo") != 536365) \
    .select(col("InvoiceNo"), col("Description")) \
    .show(5, False)

+---------+-----------------------------+
|InvoiceNo|Description                  |
+---------+-----------------------------+
|536366   |HAND WARMER UNION JACK       |
|536366   |HAND WARMER RED POLKA DOT    |
|536367   |ASSORTED COLOUR BIRD ORNAMENT|
|536367   |POPPY'S PLAYHOUSE BEDROOM    |
|536367   |POPPY'S PLAYHOUSE KITCHEN    |
+---------+-----------------------------+
only showing top 5 rows



In [4]:
df.where(col("InvoiceNo") == "536365").show(5, False)
df.where(col("InvoiceNo") != "536365").show(5, False)

+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |2010-12-01 08:26:00|2.55     |17850.0   |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |2010-12-01 08:26:00|2.75     |17850.0   |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
+---------+-----

In [5]:
from pyspark.sql.functions import instr

price_filter = col("UnitPrice") > 600
descrip_filter = instr(df.Description, "POSTAGE") >= 1
df.where(df.StockCode.isin("DOT")).where(price_filter | descrip_filter).show(5, False)

+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description   |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|536544   |DOT      |DOTCOM POSTAGE|1       |2010-12-01 14:32:00|569.77   |null      |United Kingdom|
|536592   |DOT      |DOTCOM POSTAGE|1       |2010-12-01 17:06:00|607.49   |null      |United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+



In [6]:
dot_code_filter = col("StockCode") == "DOT"
price_filter = col("UnitPrice") > 600
descrip_filter = instr(df.Description, "POSTAGE") >= 1

df.withColumn("isExpensive", dot_code_filter & (price_filter | descrip_filter)) \
    .where(col("isExpensive")) \
    .select(col("unitPrice"), col("isExpensive")).show()
    
df.withColumn("isExpensive", F.expr("NOT UnitPrice <= 250")) \
    .where(col("isExpensive")) \
    .select(col("unitPrice"), col("isExpensive")).show()

+---------+-----------+
|unitPrice|isExpensive|
+---------+-----------+
|   569.77|       true|
|   607.49|       true|
+---------+-----------+

+---------+-----------+
|unitPrice|isExpensive|
+---------+-----------+
|   569.77|       true|
|   607.49|       true|
+---------+-----------+



In [7]:
from pyspark.sql.functions import expr, pow

fabricated_quantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5
df.select(expr("CustomerId"), fabricated_quantity.alias("realQuantity")).show(2)

+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 2 rows



In [11]:
df.selectExpr(
    "CustomerId", "POWER((Quantity * UnitPrice), 2.0) + 5 as realQuantity"
).show(2)


+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 2 rows

