## Install Spark

As a requirement you need to have java, you could download it here https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html 

Download the .tgz file from https://spark.apache.org/downloads.html
For these repo we will use spark-2.4.4-bin-hadoop2.7

In our terminal, we need to uncompress the file with the following sentence

```sh
$ tar -xvf spark-2.4.4-bin-hadoop2.7.tgz
```


Copy the folder in `/usr/local` and create the symbolic link.

```sh
$ sudo mv spark-2.4.4-bin-hadoop2.7 /usr/local/
$ sudo ln -s /usr/local/spark-2.1.1-bin-hadoop2.7/ /usr/local/spark
$ cd /usr/local/spark
```

In the `/.bashrc` file add the environment variable SPARK_HOME

```sh
$ export SPARK_HOME=/usr/local/spark
```


In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as func

from datetime import datetime

In [3]:
spark = SparkSession.builder\
                    .appName("Test")\
                    .getOrCreate()

In [4]:
spark

In [5]:
csv_file = '../input/allposts.csv'

In [6]:
df = spark.read.csv(csv_file, sep=',', header=True, inferSchema=True)

In [7]:
df.printSchema()

root
 |-- created_utc: string (nullable = true)
 |-- score: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- ups: string (nullable = true)
 |-- downs: string (nullable = true)
 |-- num_comments: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- selftext: string (nullable = true)
 |-- link_flair_text: string (nullable = true)
 |-- over_18: string (nullable = true)
 |-- thumbnail: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- link_flair_css_class: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- is_self: string (nullable = true)
 |-- name: string (nullable = true)
 |-- url: string (nullable = true)
 |-- distinguished: string (nullable = true)



In [8]:
df.select("created_utc").describe().show()

+-------+--------------------+
|summary|         created_utc|
+-------+--------------------+
|  count|             5329559|
|   mean|            Infinity|
| stddev|                 NaN|
|    min| 2007 National S...|
|    max|𝖙𝖍𝖊𝖓 𝖆𝖙 𝖑?...|
+-------+--------------------+



In [9]:
df.select("created_utc").show(100)

+--------------------+
|         created_utc|
+--------------------+
|        1345107919.0|
|I don't want to s...|
|        1349284077.0|
|        1358788919.0|
|        1350053607.0|
|        1339213638.0|
|        1359073848.0|
|        1349368875.0|
|        1353974008.0|
|        1351178477.0|
|        1358774132.0|
|        1353494146.0|
|        1357827163.0|
|        1348106949.0|
|        1350916617.0|
|        1353491546.0|
|        1334266538.0|
|        1350694700.0|
|        1350494396.0|
|        1365604610.0|
|I heard you've ha...|
|   -baringmo et al."|
|        1351352749.0|
|        1357829407.0|
|        1357069866.0|
|        1346469006.0|
|        1340832240.0|
|Just wanted to th...|
|well a month or s...|
|I wake up excited...|
|               Notch|
|        1350149271.0|
|        1358783023.0|
|        1350397799.0|
|        1347519108.0|
|        1334625171.0|
|        1335970085.0|
|        1341776340.0|
|        1358901374.0|
|        1336651971.0|
|        13

In [10]:
df = spark.read.csv(csv_file, sep=',', escape='"', header=True, inferSchema=True, multiLine=True)

In [11]:
df.printSchema()

root
 |-- created_utc: double (nullable = true)
 |-- score: integer (nullable = true)
 |-- domain: string (nullable = true)
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- ups: integer (nullable = true)
 |-- downs: integer (nullable = true)
 |-- num_comments: integer (nullable = true)
 |-- permalink: string (nullable = true)
 |-- selftext: string (nullable = true)
 |-- link_flair_text: string (nullable = true)
 |-- over_18: boolean (nullable = true)
 |-- thumbnail: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- link_flair_css_class: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- is_self: boolean (nullable = true)
 |-- name: string (nullable = true)
 |-- url: string (nullable = true)
 |-- distinguished: string (nullable = true)



In [12]:
df.select(func.col("created_utc")).describe().show()

+-------+--------------------+
|summary|         created_utc|
+-------+--------------------+
|  count|             2266098|
|   mean|1.3506033828245654E9|
| stddev| 2.324046062132212E7|
|    min|        1.14481608E9|
|    max|       1.377012184E9|
+-------+--------------------+



In [13]:
df.select(func.col("created_utc").cast("int")).show(10000)

+-----------+
|created_utc|
+-----------+
| 1345107919|
| 1349284077|
| 1358788919|
| 1350053607|
| 1339213638|
| 1359073848|
| 1349368875|
| 1353974008|
| 1351178477|
| 1358774132|
| 1353494146|
| 1357827163|
| 1348106949|
| 1350916617|
| 1353491546|
| 1334266538|
| 1350694700|
| 1350494396|
| 1365604610|
| 1351352749|
| 1357829407|
| 1357069866|
| 1346469006|
| 1340832240|
| 1350149271|
| 1358783023|
| 1350397799|
| 1347519108|
| 1334625171|
| 1335970085|
| 1341776340|
| 1358901374|
| 1336651971|
| 1354206266|
| 1357554239|
| 1338015284|
| 1338512670|
| 1334158397|
| 1336588734|
| 1347842741|
| 1349525035|
| 1348335568|
| 1365178614|
| 1338218925|
| 1334400937|
| 1358966221|
| 1333985421|
| 1335351460|
| 1337775457|
| 1334071476|
| 1337891404|
| 1348961441|
| 1334172446|
| 1337160350|
| 1335024131|
| 1334845048|
| 1335794568|
| 1348847462|
| 1340586241|
| 1335188428|
| 1358052715|
| 1334236259|
| 1370637957|
| 1358274648|
| 1334024168|
| 1349294631|
| 1337071809|
| 1334361125|
| 1336

In [14]:

df.select(func.col("created_utc").cast("timestamp"))

DataFrame[created_utc: timestamp]

In [15]:
# change format in column created_utc
df = df.withColumn("created_utc",func.col("created_utc").cast("timestamp"))

In [16]:
df.select("edited").show()

+------+
|edited|
+------+
| False|
| False|
| False|
| False|
| False|
| False|
| False|
| False|
| False|
| False|
| False|
| False|
| False|
| False|
| False|
| False|
| False|
| False|
| False|
| False|
+------+
only showing top 20 rows



In [17]:
df.select("edited").describe().show()

+-------+--------------------+
|summary|              edited|
+-------+--------------------+
|  count|             2266097|
|   mean|1.3605208115215626E9|
| stddev|1.1000832819072876E7|
|    min|        1337039297.0|
|    max|                True|
+-------+--------------------+



In [18]:
df.select("edited").distinct().show()

+------------+
|      edited|
+------------+
|1375743552.0|
|1375981471.0|
|1367545509.0|
|1351029360.0|
|1364353262.0|
|1341868223.0|
|1360627327.0|
|1374071670.0|
|1346417323.0|
|1369958173.0|
|1365212956.0|
|1375991070.0|
|1341543605.0|
|1363117556.0|
|1372268708.0|
|1337433927.0|
|1372038171.0|
|1358798261.0|
|1367661494.0|
|1366564475.0|
+------------+
only showing top 20 rows



In [19]:
# change format to column edited
df = df.withColumn('edited', 
               func.when(func.col('edited').like("1%"), 'True')
               .when(func.col('edited') == '', 'False')
               .otherwise(func.col('edited'))
              )
df = df.withColumn("edited",func.col("edited").cast("boolean"))

In [20]:
df.printSchema()

root
 |-- created_utc: timestamp (nullable = true)
 |-- score: integer (nullable = true)
 |-- domain: string (nullable = true)
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- ups: integer (nullable = true)
 |-- downs: integer (nullable = true)
 |-- num_comments: integer (nullable = true)
 |-- permalink: string (nullable = true)
 |-- selftext: string (nullable = true)
 |-- link_flair_text: string (nullable = true)
 |-- over_18: boolean (nullable = true)
 |-- thumbnail: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- edited: boolean (nullable = true)
 |-- link_flair_css_class: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- is_self: boolean (nullable = true)
 |-- name: string (nullable = true)
 |-- url: string (nullable = true)
 |-- distinguished: string (nullable = true)



In [21]:
#df.select(func.col("edited").cast("boolean")).show(10000)

In [22]:

df.select(func.col("edited")).distinct().show(100)


+------+
|edited|
+------+
|  null|
|  true|
| false|
+------+



In [23]:
df.withColumn('edited', 
               func.when(func.col('edited').like("1%"), 'True')
               .when(func.col('edited') == '', 'False')
               .otherwise(func.col('edited'))
              ).select(func.col("edited")).distinct().show(100)

AnalysisException: "cannot resolve 'CASE WHEN CAST(`edited` AS STRING) LIKE '1%' THEN 'True' WHEN (`edited` = CAST('' AS BOOLEAN)) THEN 'False' ELSE `edited` END' due to data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type;;\n'Project [created_utc#233, score#129, domain#130, id#131, title#132, ups#133, downs#134, num_comments#135, permalink#136, selftext#137, link_flair_text#138, over_18#139, thumbnail#140, subreddit_id#141, CASE WHEN cast(edited#359 as string) LIKE 1% THEN True WHEN (edited#359 = cast( as boolean)) THEN False ELSE edited#359 END AS edited#387, link_flair_css_class#143, author_flair_css_class#144, is_self#145, name#146, url#147, distinguished\r#148]\n+- Project [created_utc#233, score#129, domain#130, id#131, title#132, ups#133, downs#134, num_comments#135, permalink#136, selftext#137, link_flair_text#138, over_18#139, thumbnail#140, subreddit_id#141, cast(edited#337 as boolean) AS edited#359, link_flair_css_class#143, author_flair_css_class#144, is_self#145, name#146, url#147, distinguished\r#148]\n   +- Project [created_utc#233, score#129, domain#130, id#131, title#132, ups#133, downs#134, num_comments#135, permalink#136, selftext#137, link_flair_text#138, over_18#139, thumbnail#140, subreddit_id#141, CASE WHEN edited#142 LIKE 1% THEN True WHEN (edited#142 = ) THEN False ELSE edited#142 END AS edited#337, link_flair_css_class#143, author_flair_css_class#144, is_self#145, name#146, url#147, distinguished\r#148]\n      +- Project [cast(created_utc#128 as timestamp) AS created_utc#233, score#129, domain#130, id#131, title#132, ups#133, downs#134, num_comments#135, permalink#136, selftext#137, link_flair_text#138, over_18#139, thumbnail#140, subreddit_id#141, edited#142, link_flair_css_class#143, author_flair_css_class#144, is_self#145, name#146, url#147, distinguished\r#148]\n         +- Relation[created_utc#128,score#129,domain#130,id#131,title#132,ups#133,downs#134,num_comments#135,permalink#136,selftext#137,link_flair_text#138,over_18#139,thumbnail#140,subreddit_id#141,edited#142,link_flair_css_class#143,author_flair_css_class#144,is_self#145,name#146,url#147,distinguished\r#148] csv\n"