In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 69 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 26.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=9e984979d5f78b19bd366999afc3d305cbd3a0a5b96afe661ead248ccc39ac43
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master('local[4]')\
        .appName('Lesson_3')\
        .config('spark.ui.port', '4050')\
        .config('spark.executor.instances', 2)\
        .config('spark.executor.memory', '5g')\
        .config('spark.executor.cores', 2)\
        .getOrCreate()

sc = spark.sparkContext

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

data = spark.read.csv('raw_sales.csv', header=True, inferSchema=True)

In [None]:
data.show()

+-------------------+--------+-------+------------+--------+
|           datesold|postcode|  price|propertyType|bedrooms|
+-------------------+--------+-------+------------+--------+
|2007-02-07 00:00:00|    2607| 525000|       house|       4|
|2007-02-27 00:00:00|    2906| 290000|       house|       3|
|2007-03-07 00:00:00|    2905| 328000|       house|       3|
|2007-03-09 00:00:00|    2905| 380000|       house|       4|
|2007-03-21 00:00:00|    2906| 310000|       house|       3|
|2007-04-04 00:00:00|    2905| 465000|       house|       4|
|2007-04-24 00:00:00|    2607| 399000|       house|       3|
|2007-04-30 00:00:00|    2606|1530000|       house|       4|
|2007-05-24 00:00:00|    2902| 359000|       house|       3|
|2007-05-25 00:00:00|    2906| 320000|       house|       3|
|2007-06-26 00:00:00|    2902| 385000|       house|       3|
|2007-06-27 00:00:00|    2906| 305000|       house|       3|
|2007-06-27 00:00:00|    2612| 850000|       house|       4|
|2007-06-28 00:00:00|   

In [None]:
from pyspark.sql.window import Window

windSpec = Window\
    .partitionBy('postcode')\
    .orderBy('datesold')\

table_a = data.withColumn('avg_before', F.avg('price').over(windSpec.rowsBetween(-10, Window.currentRow - 1))).withColumn('avg_after', F.avg('price').over(windSpec.rowsBetween(Window.currentRow + 1, 10))).withColumn('last_sold', F.lag('price', 1).over(windSpec))
table_a.show(100)

+-------------------+--------+------+------------+--------+-----------------+---------+---------+
|           datesold|postcode| price|propertyType|bedrooms|       avg_before|avg_after|last_sold|
+-------------------+--------+------+------------+--------+-----------------+---------+---------+
|2007-07-02 00:00:00|    2914|800000|       house|       5|             null| 502800.0|     null|
|2008-06-17 00:00:00|    2914|600000|       house|       4|         800000.0| 486800.0|   800000|
|2008-08-29 00:00:00|    2914|465000|       house|       4|         700000.0| 487800.0|   600000|
|2008-09-02 00:00:00|    2914|541000|       house|       4|621666.6666666666| 481450.0|   465000|
|2008-09-05 00:00:00|    2914|395000|       house|       3|         601500.0| 495950.0|   541000|
|2008-09-05 00:00:00|    2914|552000|       house|       4|         560200.0| 500750.0|   395000|
|2008-09-17 00:00:00|    2914|410000|       house|       3|558833.3333333334| 505350.0|   552000|
|2008-09-26 00:00:00

In [None]:
table_b = data.withColumn('year', F.year('datesold')).groupBy('year').agg(F.avg('price').alias('avg_year_price'))

In [None]:
result = table_a.join(table_b, on=[F.year(table_a.datesold) == table_b.year], how='left')

In [None]:
result.show()

+-------------------+--------+------+------------+--------+-----------------+---------+---------+----+-----------------+
|           datesold|postcode| price|propertyType|bedrooms|       avg_before|avg_after|last_sold|year|   avg_year_price|
+-------------------+--------+------+------------+--------+-----------------+---------+---------+----+-----------------+
|2007-07-02 00:00:00|    2914|800000|       house|       5|             null| 502800.0|     null|2007|522377.2108843537|
|2008-06-17 00:00:00|    2914|600000|       house|       4|         800000.0| 486800.0|   800000|2008|493814.1627543036|
|2008-08-29 00:00:00|    2914|465000|       house|       4|         700000.0| 487800.0|   600000|2008|493814.1627543036|
|2008-09-02 00:00:00|    2914|541000|       house|       4|621666.6666666666| 481450.0|   465000|2008|493814.1627543036|
|2008-09-05 00:00:00|    2914|395000|       house|       3|         601500.0| 495950.0|   541000|2008|493814.1627543036|
|2008-09-05 00:00:00|    2914|55

In [None]:
@F.udf(returnType=IntegerType())
def get_unique(value):
    return len(set(value))



result.withColumn('unique_el', get_unique(F.array(result.columns))).show(10)

+-------------------+--------+-------+------------+--------+-----------------+---------+---------+----+-----------------+---------+
|           datesold|postcode|  price|propertyType|bedrooms|       avg_before|avg_after|last_sold|year|   avg_year_price|unique_el|
+-------------------+--------+-------+------------+--------+-----------------+---------+---------+----+-----------------+---------+
|2007-07-02 00:00:00|    2914| 800000|       house|       5|             null| 502800.0|     null|2007|522377.2108843537|        9|
|2008-06-17 00:00:00|    2914| 600000|       house|       4|         800000.0| 486800.0|   800000|2008|493814.1627543036|       10|
|2008-08-29 00:00:00|    2914| 465000|       house|       4|         700000.0| 487800.0|   600000|2008|493814.1627543036|       10|
|2008-09-02 00:00:00|    2914| 541000|       house|       4|621666.6666666666| 481450.0|   465000|2008|493814.1627543036|       10|
|2008-09-05 00:00:00|    2914| 395000|       house|       3|         601500.