In [7]:
# Configuration setup (Note : this is an old configuration, lot of changes in the newer spark version)
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [8]:
import os
cwd = os.getcwd()
print("Current working directory = ", cwd)

Current working directory =  /content


In [9]:
os.listdir()

['.config',
 'spark-3.1.1-bin-hadoop3.2.tgz',
 'challenge.csv',
 'spark-3.1.1-bin-hadoop3.2',
 '.ipynb_checkpoints',
 'sample_data']

In [None]:
# Read and prep data
from pyspark.sql.types import *
schema = StructType([
    StructField('ip_address', StringType()),
    StructField('Country', StringType()),
    StructField('Domain Name', StringType()),
    StructField('Bytes_used', IntegerType())
])
df = spark.read.csv("challenge.csv", header = True, schema=schema)
df.head(6)
df.printSchema()

In [24]:
from pyspark.sql.functions import *

In [25]:
# Add a column to say yes or no to whether the country is Mexico.
ans_1 = df.withColumn("isCountryMexico", when(df.Country == 'Mexico', 'YES').otherwise('NO'))
ans_1.show()

+---------------+--------------+-----------------+----------+---------------+
|     ip_address|       Country|      Domain Name|Bytes_used|isCountryMexico|
+---------------+--------------+-----------------+----------+---------------+
|  52.81.192.172|         China| odnoklassniki.ru|       463|             NO|
| 119.239.207.13|         China|         youtu.be|        51|             NO|
|  68.69.217.210|         China|        adobe.com|        10|             NO|
|   7.191.21.223|      Bulgaria|     linkedin.com|       853|             NO|
|   211.13.10.68|     Indonesia|          hud.gov|        29|             NO|
|   239.80.21.97|      Suriname|       smh.com.au|       218|             NO|
|106.214.106.233|       Jamaica|    amazonaws.com|        95|             NO|
| 127.242.24.138|         China| surveymonkey.com|       123|             NO|
|     99.2.6.139|Czech Republic|     geocities.jp|       322|             NO|
|   237.54.11.63|         China|       amazon.com|        83|   

In [26]:
# Group by your new column and sum bytes used.
ans_2 = ans_1.groupBy('isCountryMexico').sum('Bytes_used')
ans_2.show()

+---------------+---------------+
|isCountryMexico|sum(Bytes_used)|
+---------------+---------------+
|            YES|           6293|
|             NO|         508076|
+---------------+---------------+



In [28]:
# Group by country and use the sqlfunc.countDistinct function to calculate the number of IP addresses seen in each country
ans_3 = ans_1.groupBy('Country').agg(countDistinct(ans_1.ip_address).alias('number_of_ips'))
ans_3.show()

+-----------+-------------+
|    Country|number_of_ips|
+-----------+-------------+
|       Chad|            1|
|     Russia|           56|
|   Paraguay|            1|
|      Yemen|            1|
|     Sweden|           28|
|Philippines|           65|
|   Malaysia|            5|
|     Turkey|            1|
|     Malawi|            2|
|    Germany|            5|
|    Comoros|            1|
|Afghanistan|            5|
|     Rwanda|            1|
|      Sudan|            1|
|     France|           21|
|     Greece|            8|
|  Sri Lanka|            3|
|   Dominica|            1|
|  Argentina|           14|
|    Belgium|            1|
+-----------+-------------+
only showing top 20 rows



In [30]:
# Add a column to say yes or no to whether the country is Mexico. (Use only spark sql)
df.registerTempTable('challenge')
sql_ans_1 = spark.sql('''select *, case when Country = 'Mexico' then 'YES' else 'NO' END as isCountryMexico from challenge''')
sql_ans_1.show()

+---------------+--------------+-----------------+----------+---------------+
|     ip_address|       Country|      Domain Name|Bytes_used|isCountryMexico|
+---------------+--------------+-----------------+----------+---------------+
|  52.81.192.172|         China| odnoklassniki.ru|       463|             NO|
| 119.239.207.13|         China|         youtu.be|        51|             NO|
|  68.69.217.210|         China|        adobe.com|        10|             NO|
|   7.191.21.223|      Bulgaria|     linkedin.com|       853|             NO|
|   211.13.10.68|     Indonesia|          hud.gov|        29|             NO|
|   239.80.21.97|      Suriname|       smh.com.au|       218|             NO|
|106.214.106.233|       Jamaica|    amazonaws.com|        95|             NO|
| 127.242.24.138|         China| surveymonkey.com|       123|             NO|
|     99.2.6.139|Czech Republic|     geocities.jp|       322|             NO|
|   237.54.11.63|         China|       amazon.com|        83|   

In [31]:
# Group by your new column and sum bytes used. (Use only spark sql)
sql_ans_1.registerTempTable('updated_challenge')
sql_ans_2 = spark.sql('''
  select
    isCountryMexico,
    sum(bytes_used)
  from updated_challenge
  group by isCountryMexico
''')
sql_ans_2.show()

+---------------+---------------+
|isCountryMexico|sum(bytes_used)|
+---------------+---------------+
|            YES|           6293|
|             NO|         508076|
+---------------+---------------+



In [33]:
# Group by country and use the sqlfunc.countDistinct function to calculate the number of IP addresses seen in each country
sql_ans_3 = spark.sql('''
  select
    country,
    count(distinct(ip_address)),
    count(ip_address)
  from updated_challenge
  group by country
''')
sql_ans_3.show()


+-----------+--------------------------+-----------------+
|    country|count(DISTINCT ip_address)|count(ip_address)|
+-----------+--------------------------+-----------------+
|       Chad|                         1|                1|
|     Russia|                        56|               56|
|   Paraguay|                         1|                1|
|      Yemen|                         1|                1|
|     Sweden|                        28|               28|
|Philippines|                        65|               65|
|   Malaysia|                         5|                5|
|     Turkey|                         1|                1|
|     Malawi|                         2|                2|
|    Germany|                         5|                5|
|    Comoros|                         1|                1|
|Afghanistan|                         5|                5|
|     Rwanda|                         1|                1|
|      Sudan|                         1|                