### Importing libraries and initializing Spark context

In [1]:
import findspark
findspark.init('/usr/local/spark')
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.executor.memory","25g").config("spark.driver.memory","25g").config("spark.memory.offHeap.enabled","true").config("spark.memory.offHeap.size","32g").getOrCreate()

### Loading .csv files into individual dataframes

In [3]:
filePath_1gb = "./CSV-Files/nasa_logs_1GB.csv"
df_1gb = spark.read.format('csv').option("header","false").option("inferSchema","true").load(filePath_1gb)

In [10]:
filePath_5gb = "./CSV-Files/nasa_logs_5GB.csv"
df_5gb = spark.read.format('csv').option("header","false").option("inferSchema","true").load(filePath_5gb)

In [11]:
filePath_10gb = "./CSV-Files/nasa_logs_10GB.csv"
df_10gb = spark.read.format('csv').option("header","false").option("inferSchema","true").load(filePath_10gb)

In [12]:
%%time
filePath_15gb = "./CSV-Files/nasa_logs_15GB.csv"
df_15gb = spark.read.format('csv').option("header","false").option("inferSchema","true").load(filePath_15gb)

CPU times: user 15.3 ms, sys: 6.39 ms, total: 21.7 ms
Wall time: 2min 4s


### Displaying total number of loaded records in each dataframe

In [76]:
df_1gb.count()

13846448

In [77]:
df_5gb.count()

58847404

In [78]:
df_10gb.count()

114233196

In [79]:
df_15gb.count()

173080600

### Renaming column names into meaningful names

In [14]:
df_1gb = df_1gb.withColumnRenamed("_c0","host") \
                .withColumnRenamed("_c1","method") \
                .withColumnRenamed("_c2","endpoint") \
                .withColumnRenamed("_c3","protocol") \
                .withColumnRenamed("_c4","status") \
                .withColumnRenamed("_c5","object_size") \
                .withColumnRenamed("_c6","timestamp")

In [15]:
df_5gb = df_5gb.withColumnRenamed("_c0","host") \
                .withColumnRenamed("_c1","method") \
                .withColumnRenamed("_c2","endpoint") \
                .withColumnRenamed("_c3","protocol") \
                .withColumnRenamed("_c4","status") \
                .withColumnRenamed("_c5","object_size") \
                .withColumnRenamed("_c6","timestamp")

In [16]:
df_10gb = df_10gb.withColumnRenamed("_c0","host") \
                .withColumnRenamed("_c1","method") \
                .withColumnRenamed("_c2","endpoint") \
                .withColumnRenamed("_c3","protocol") \
                .withColumnRenamed("_c4","status") \
                .withColumnRenamed("_c5","object_size") \
                .withColumnRenamed("_c6","timestamp")

In [17]:
df_15gb = df_15gb.withColumnRenamed("_c0","host") \
                .withColumnRenamed("_c1","method") \
                .withColumnRenamed("_c2","endpoint") \
                .withColumnRenamed("_c3","protocol") \
                .withColumnRenamed("_c4","status") \
                .withColumnRenamed("_c5","object_size") \
                .withColumnRenamed("_c6","timestamp")

### Converting dataframes into Parquet files

In [18]:
# df_1gb.write.parquet("./Parquet-Files/nasa_logs_1GB.parquet")

AnalysisException: path file:/home/ubuntu/dataset/nasa_logs_1GB.parquet already exists.;

In [86]:
# df_5gb.write.parquet("nasa_logs_5GB.parquet")

In [87]:
# df_10gb.write.parquet("nasa_logs_10GB.parquet")

In [88]:
# df_15gb.write.parquet("nasa_logs_15GB.parquet")

### Loading Parquet files into dataframes to be able to query them

In [19]:
prqPath_1gb = spark.read.parquet("nasa_logs_1GB.parquet")

In [20]:
prqPath_5gb = spark.read.parquet("nasa_logs_5GB.parquet")

In [21]:
prqPath_10gb = spark.read.parquet("nasa_logs_10GB.parquet")

In [22]:
prqPath_15gb = spark.read.parquet("nasa_logs_15GB.parquet")

### Creating a view from dataframes to a meaningful name that can be used in the queries

In [23]:
prqPath_1gb.createOrReplaceTempView("http_logs_prq_1gb")

In [24]:
prqPath_5gb.createOrReplaceTempView("http_logs_prq_5gb")

In [25]:
prqPath_10gb.createOrReplaceTempView("http_logs_prq_10gb")

In [26]:
prqPath_15gb.createOrReplaceTempView("http_logs_prq_15gb")

### Query 1: Count the number of records

In [27]:
%%time
query1_1gb = spark.sql("select count(*) from http_logs_prq_1gb")
query1_1gb.show()

CPU times: user 1.46 ms, sys: 0 ns, total: 1.46 ms
Wall time: 29.8 ms


In [28]:
%%time
query1_5gb = spark.sql("select count(*) from http_logs_prq_5gb")
query1_5gb.show()

CPU times: user 1.18 ms, sys: 200 µs, total: 1.38 ms
Wall time: 6.24 ms


In [29]:
%%time
query1_10gb = spark.sql("select count(*) from http_logs_prq_10gb")
query1_10gb.show()

CPU times: user 1.17 ms, sys: 198 µs, total: 1.37 ms
Wall time: 5.57 ms


In [30]:
%%time
query1_15gb = spark.sql("select count(*) from http_logs_prq_15gb")
query1_15gb.show()

CPU times: user 1.3 ms, sys: 217 µs, total: 1.51 ms
Wall time: 5.46 ms


### Query 2: 

In [148]:
%%time
query2_1gb = spark.sql("SELECT endpoint, COUNT(*) AS page_view_count FROM http_logs_prq_1gb \
                        GROUP BY endpoint \
                        ORDER BY page_view_count DESC LIMIT 5")
query2_1gb.show()

+--------------------+---------------+
|            endpoint|page_view_count|
+--------------------+---------------+
|/images/NASA-logo...|         834856|
|/images/KSC-logos...|         659880|
|/images/MOSAIC-lo...|         511632|
|/images/USA-logos...|         508296|
|/images/WORLD-log...|         503700|
+--------------------+---------------+

CPU times: user 1.95 ms, sys: 283 µs, total: 2.24 ms
Wall time: 989 ms


In [149]:
%%time
query2_5gb = spark.sql("SELECT endpoint, COUNT(*) AS page_view_count FROM http_logs_prq_5gb \
                        GROUP BY endpoint \
                        ORDER BY page_view_count DESC LIMIT 5")
query2_5gb.show()

+--------------------+---------------+
|            endpoint|page_view_count|
+--------------------+---------------+
|/images/NASA-logo...|        3548138|
|/images/KSC-logos...|        2804490|
|/images/MOSAIC-lo...|        2174436|
|/images/USA-logos...|        2160258|
|/images/WORLD-log...|        2140725|
+--------------------+---------------+

CPU times: user 2.5 ms, sys: 0 ns, total: 2.5 ms
Wall time: 2.17 s


In [155]:
%%time
query2_10gb = spark.sql("SELECT endpoint, COUNT(*) AS page_view_count FROM http_logs_prq_10gb \
                        GROUP BY endpoint \
                        ORDER BY page_view_count DESC LIMIT 5")
query2_10gb.show()

+--------------------+---------------+
|            endpoint|page_view_count|
+--------------------+---------------+
|/images/NASA-logo...|        6887562|
|/images/KSC-logos...|        5444010|
|/images/MOSAIC-lo...|        4220964|
|/images/USA-logos...|        4193442|
|/images/WORLD-log...|        4155525|
+--------------------+---------------+

CPU times: user 2.73 ms, sys: 399 µs, total: 3.13 ms
Wall time: 4.48 s


In [161]:
%%time
query2_15gb = spark.sql("SELECT endpoint, COUNT(*) AS page_view_count FROM http_logs_prq_15gb \
                        GROUP BY endpoint \
                        ORDER BY page_view_count DESC LIMIT 5")
query2_15gb.show()

+--------------------+---------------+
|            endpoint|page_view_count|
+--------------------+---------------+
|/images/NASA-logo...|       10435700|
|/images/KSC-logos...|        8248500|
|/images/MOSAIC-lo...|        6395400|
|/images/USA-logos...|        6353700|
|/images/WORLD-log...|        6296250|
+--------------------+---------------+

CPU times: user 2.92 ms, sys: 434 µs, total: 3.36 ms
Wall time: 6.61 s


### Query 3:

In [169]:
%%time
query3_1gb = spark.sql("SELECT status, count(status) AS distinct_status FROM http_logs_prq_1gb \
                        WHERE status >= '400' \
                        GROUP BY status \
                        ORDER BY distinct_status DESC")
query3_1gb.show()

+------+---------------+
|status|distinct_status|
+------+---------------+
|   404|          83596|
|   403|            900|
|   500|            260|
|   501|            164|
|   400|             60|
+------+---------------+

CPU times: user 2.08 ms, sys: 303 µs, total: 2.39 ms
Wall time: 992 ms


In [170]:
%%time
query3_5gb = spark.sql("SELECT status, count(status) AS distinct_status FROM http_logs_prq_5gb \
                        WHERE status >= '400' \
                        GROUP BY status \
                        ORDER BY distinct_status DESC")
query3_5gb.show()

+------+---------------+
|status|distinct_status|
+------+---------------+
|   404|         355283|
|   403|           3825|
|   500|           1105|
|   501|            697|
|   400|            255|
+------+---------------+

CPU times: user 2.53 ms, sys: 1 µs, total: 2.53 ms
Wall time: 366 ms


In [171]:
%%time
query3_10gb = spark.sql("SELECT status, count(status) AS distinct_status FROM http_logs_prq_10gb \
                        WHERE status >= '400' \
                        GROUP BY status \
                        ORDER BY distinct_status DESC")
query3_10gb.show()

+------+---------------+
|status|distinct_status|
+------+---------------+
|   404|         689667|
|   403|           7425|
|   500|           2145|
|   501|           1353|
|   400|            495|
+------+---------------+

CPU times: user 2.93 ms, sys: 0 ns, total: 2.93 ms
Wall time: 557 ms


In [172]:
%%time
query3_15gb = spark.sql("SELECT status, count(status) AS distinct_status FROM http_logs_prq_15gb \
                        WHERE status >= '400' \
                        GROUP BY status \
                        ORDER BY distinct_status DESC")
query3_15gb.show()

+------+---------------+
|status|distinct_status|
+------+---------------+
|   404|        1044950|
|   403|          11250|
|   500|           3250|
|   501|           2050|
|   400|            750|
+------+---------------+

CPU times: user 2.2 ms, sys: 0 ns, total: 2.2 ms
Wall time: 659 ms


### Query 4:

In [174]:
%%time
query4_1gb = spark.sql("SELECT endpoint, count(endpoint) AS count_of_requests \
                        FROM http_logs_prq_1gb WHERE status >= '400' \
                        GROUP BY endpoint \
                        ORDER BY count_of_requests DESC \
                        LIMIT 5")
query4_1gb.show()

+--------------------+-----------------+
|            endpoint|count_of_requests|
+--------------------+-----------------+
|/pub/winvn/readme...|             8016|
|/pub/winvn/releas...|             6928|
|/shuttle/missions...|             2732|
|/shuttle/missions...|             1712|
|/history/apollo/a...|             1536|
+--------------------+-----------------+

CPU times: user 3.27 ms, sys: 0 ns, total: 3.27 ms
Wall time: 511 ms


In [175]:
%%time
query4_5gb = spark.sql("SELECT endpoint, count(endpoint) AS count_of_requests \
                        FROM http_logs_prq_5gb WHERE status >= '400' \
                        GROUP BY endpoint \
                        ORDER BY count_of_requests DESC \
                        LIMIT 5")
query4_5gb.show()

+--------------------+-----------------+
|            endpoint|count_of_requests|
+--------------------+-----------------+
|/pub/winvn/readme...|            34068|
|/pub/winvn/releas...|            29444|
|/shuttle/missions...|            11611|
|/shuttle/missions...|             7276|
|/history/apollo/a...|             6528|
+--------------------+-----------------+

CPU times: user 2.4 ms, sys: 0 ns, total: 2.4 ms
Wall time: 647 ms


In [176]:
%%time
query4_10gb = spark.sql("SELECT endpoint, count(endpoint) AS count_of_requests \
                        FROM http_logs_prq_10gb WHERE status >= '400' \
                        GROUP BY endpoint \
                        ORDER BY count_of_requests DESC \
                        LIMIT 5")
query4_10gb.show()

+--------------------+-----------------+
|            endpoint|count_of_requests|
+--------------------+-----------------+
|/pub/winvn/readme...|            66132|
|/pub/winvn/releas...|            57156|
|/shuttle/missions...|            22539|
|/shuttle/missions...|            14124|
|/history/apollo/a...|            12672|
+--------------------+-----------------+

CPU times: user 2.33 ms, sys: 0 ns, total: 2.33 ms
Wall time: 1.05 s


In [177]:
%%time
query4_15gb = spark.sql("SELECT endpoint, count(endpoint) AS count_of_requests \
                        FROM http_logs_prq_15gb WHERE status >= '400' \
                        GROUP BY endpoint \
                        ORDER BY count_of_requests DESC \
                        LIMIT 5")
query4_15gb.show()

+--------------------+-----------------+
|            endpoint|count_of_requests|
+--------------------+-----------------+
|/pub/winvn/readme...|           100200|
|/pub/winvn/releas...|            86600|
|/shuttle/missions...|            34150|
|/shuttle/missions...|            21400|
|/history/apollo/a...|            19200|
+--------------------+-----------------+

CPU times: user 2.06 ms, sys: 175 µs, total: 2.23 ms
Wall time: 1.36 s


### Query 5:

In [31]:
%%time
query5_1gb = spark.sql("SELECT DISTINCT(endpoint), timestamp, ROUND((object_size * 0.000001)) AS SIZE_IN_MB \
                        FROM http_logs_prq_1gb \
                        ORDER BY SIZE_IN_MB DESC \
                        LIMIT 10")
query5_1gb.show()

+--------------------+-------------------+----------+
|            endpoint|          timestamp|SIZE_IN_MB|
+--------------------+-------------------+----------+
|/shuttle/countdow...|1995-07-07 14:03:32|         7|
|/statistics/1995/...|1995-08-15 20:37:24|         3|
|/statistics/1995/...|1995-07-07 10:28:56|         3|
|/statistics/1995/...|1995-07-06 10:19:00|         3|
|/statistics/1995/...| 1995-08-21 8:43:56|         3|
|/statistics/1995/...| 1995-07-09 9:22:14|         3|
|/mdss/ped/acs/SDP.ps|1995-07-11 17:29:34|         3|
|/statistics/1995/...| 1995-07-05 8:57:07|         3|
|/statistics/1995/...|1995-08-07 18:28:57|         3|
|/statistics/1995/...|1995-08-03 15:51:23|         3|
+--------------------+-------------------+----------+

CPU times: user 2.9 ms, sys: 480 µs, total: 3.38 ms
Wall time: 10.3 s


In [32]:
%%time
query5_5gb = spark.sql("SELECT DISTINCT(endpoint), timestamp, ROUND((object_size * 0.000001)) AS SIZE_IN_MB \
                        FROM http_logs_prq_5gb \
                        ORDER BY SIZE_IN_MB DESC \
                        LIMIT 10")
query5_5gb.show()

+--------------------+-------------------+----------+
|            endpoint|          timestamp|SIZE_IN_MB|
+--------------------+-------------------+----------+
|/shuttle/countdow...|1995-07-07 14:03:32|         7|
|/statistics/1995/...|1995-08-15 20:37:24|         3|
|/statistics/1995/...|1995-07-07 10:28:56|         3|
|/statistics/1995/...|1995-07-06 10:19:00|         3|
|/statistics/1995/...| 1995-08-21 8:43:56|         3|
|/statistics/1995/...| 1995-07-09 9:22:14|         3|
|/mdss/ped/acs/SDP.ps|1995-07-11 17:29:34|         3|
|/statistics/1995/...| 1995-07-05 8:57:07|         3|
|/statistics/1995/...|1995-08-07 18:28:57|         3|
|/statistics/1995/...|1995-08-03 15:51:23|         3|
+--------------------+-------------------+----------+

CPU times: user 0 ns, sys: 4.29 ms, total: 4.29 ms
Wall time: 12.2 s


In [34]:
%%time
query5_10gb = spark.sql("SELECT DISTINCT(endpoint), timestamp, ROUND((object_size * 0.000001)) AS SIZE_IN_MB \
                        FROM http_logs_prq_10gb \
                        ORDER BY SIZE_IN_MB DESC \
                        LIMIT 10")
query5_10gb.show()

+--------------------+-------------------+----------+
|            endpoint|          timestamp|SIZE_IN_MB|
+--------------------+-------------------+----------+
|/shuttle/countdow...|1995-07-07 14:03:32|         7|
|/statistics/1995/...|1995-08-15 20:37:24|         3|
|/statistics/1995/...|1995-07-07 10:28:56|         3|
|/statistics/1995/...|1995-07-06 10:19:00|         3|
|/statistics/1995/...| 1995-08-21 8:43:56|         3|
|/statistics/1995/...| 1995-07-09 9:22:14|         3|
|/mdss/ped/acs/SDP.ps|1995-07-11 17:29:34|         3|
|/statistics/1995/...| 1995-07-05 8:57:07|         3|
|/statistics/1995/...|1995-08-07 18:28:57|         3|
|/statistics/1995/...|1995-08-03 15:51:23|         3|
+--------------------+-------------------+----------+

CPU times: user 4.28 ms, sys: 362 µs, total: 4.64 ms
Wall time: 23.1 s


In [36]:
%%time
query5_15gb = spark.sql("SELECT DISTINCT(endpoint), timestamp, ROUND((object_size * 0.000001)) AS SIZE_IN_MB \
                        FROM http_logs_prq_5gb \
                        ORDER BY SIZE_IN_MB DESC \
                        LIMIT 10")
query5_15gb.show()

+--------------------+-------------------+----------+
|            endpoint|          timestamp|SIZE_IN_MB|
+--------------------+-------------------+----------+
|/shuttle/countdow...|1995-07-07 14:03:32|         7|
|/statistics/1995/...|1995-08-15 20:37:24|         3|
|/statistics/1995/...|1995-07-07 10:28:56|         3|
|/statistics/1995/...|1995-07-06 10:19:00|         3|
|/statistics/1995/...| 1995-08-21 8:43:56|         3|
|/statistics/1995/...| 1995-07-09 9:22:14|         3|
|/mdss/ped/acs/SDP.ps|1995-07-11 17:29:34|         3|
|/statistics/1995/...| 1995-07-05 8:57:07|         3|
|/statistics/1995/...|1995-08-07 18:28:57|         3|
|/statistics/1995/...|1995-08-03 15:51:23|         3|
+--------------------+-------------------+----------+

CPU times: user 3.98 ms, sys: 413 µs, total: 4.39 ms
Wall time: 10.5 s


In [4]:
df_1gb.write.orc("./ORC-Files/nasa_logs_1GB")