# Initialization

In [1]:
# Making sure to link pyspark to the right Spark folder with findspark
import findspark
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession

findspark.init('/opt/spark')
MY_MONGO="mongodb://root:yurt@mongo:27017"
MONGO_URL="mongodb://root:yurt@mongo:27017/local.yieldify?authSource=local"


In [2]:
conf = SparkConf() \
    .set("spark.mongodb.input.uri", "mongodb://root:yurt@mongo:27017/local.yieldify?authSource=local") \
    .set("spark.mongodb.output.uri", "mongodb://root:yurt@mongo:27017/local.yieldify?authSource=local") \
    .setAppName("pysparkYieldData")
sc = SparkContext(conf=conf)

spark = SparkSession(sc)


In [3]:
! hadoop fs -ls hdfs://node-master:9000/user/$USER

Found 1 items
drwxr-xr-x   - root supergroup          0 2021-09-12 09:54 hdfs://node-master:9000/user/root


In [4]:
! ls $PWD/

Dask-Yarn.ipynb		    GeoLite2-Country-CSV_20210831  datasets
GeoLite2-City-CSV_20210831  PySparkYieldDataOld.ipynb	   spark-warehouse


In [5]:
! hadoop fs -put -f $PWD/datasets/input_data.gz


In [None]:
! unzip /root/lab/datasets/GeoLite2-Country-CSV_20210831.zip

In [None]:
! unzip /root/lab/datasets/GeoLite2-City-CSV_20210831.zip

In [None]:
! hadoop fs -put -f $PWD/datasets/GeoLite2-Country-CSV_20210831/GeoLite2-Country-Blocks-IPv4.csv


In [None]:
! hadoop fs -put -f $PWD/datasets/GeoLite2-City-CSV_20210831/GeoLite2-City-Blocks-IPv4.csv

In [None]:
! hadoop fs -put -f $PWD/datasets/GeoLite2-City-CSV_20210831/GeoLite2-City-Locations-en.csv

In [None]:
! hadoop fs -put -f $PWD/datasets/GeoLite2-Country-CSV_20210831/GeoLite2-Country-Locations-en.csv

### Data Files

We can now see it as part of the HDFS

In [6]:
! hadoop fs -ls hdfs://node-master:9000/user/$USER/*

Found 6 items
drwxr-xr-x   - root supergroup          0 2021-09-12 12:24 hdfs://node-master:9000/user/root/.sparkStaging
-rw-r--r--   2 root supergroup  221476423 2021-09-12 09:54 hdfs://node-master:9000/user/root/GeoLite2-City-Blocks-IPv4.csv
-rw-r--r--   2 root supergroup   11952196 2021-09-12 09:54 hdfs://node-master:9000/user/root/GeoLite2-City-Locations-en.csv
-rw-r--r--   2 root supergroup   12829438 2021-09-12 09:54 hdfs://node-master:9000/user/root/GeoLite2-Country-Blocks-IPv4.csv
-rw-r--r--   2 root supergroup       9889 2021-09-12 09:54 hdfs://node-master:9000/user/root/GeoLite2-Country-Locations-en.csv
-rw-r--r--   2 root supergroup    7866234 2021-09-12 12:25 hdfs://node-master:9000/user/root/input_data.gz


In [7]:
! hadoop fs -ls hdfs://node-master:9000/user/root/input_data.gz

-rw-r--r--   2 root supergroup    7866234 2021-09-12 12:25 hdfs://node-master:9000/user/root/input_data.gz


In [8]:
parse_file = sc.textFile("input_data.gz")


In [9]:
parts = parse_file.map(lambda l: l.split("\t"))


In [10]:
from pyspark.sql import Row

user_activity = parts.map(lambda p: Row(
    date=p[0], 
    time=(p[1]),
    user_id=(p[2]),
    url=(p[3]),
    ip=(p[4]),
    user_agent_str=(p[5]),

))


In [11]:
schemaUsers = spark.createDataFrame(user_activity)


In [12]:
schemaUsers.printSchema

<bound method DataFrame.printSchema of DataFrame[date: string, time: string, user_id: string, url: string, ip: string, user_agent_str: string]>

In [13]:
schemaUsers.show()

+----------+--------+--------------------+--------------------+--------------------+--------------------+
|      date|    time|             user_id|                 url|                  ip|      user_agent_str|
+----------+--------+--------------------+--------------------+--------------------+--------------------+
|2014-10-12|17:01:01|f4fdd9e55192e9475...|http://6f2a9cab64...|       94.11.238.152|Mozilla/5.0 (iPad...|
|2014-10-12|17:01:01|0ae53126499336757...|http://8eb4ac417c...|       92.238.71.109|Mozilla/5.0 (iPad...|
|2014-10-12|17:01:01|c5ac174ee153f7e57...|https://1415d3778...|         2.26.44.196|Mozilla/5.0 (Linu...|
|2014-10-12|17:01:01|2d86766f9908fde41...|http://47e1f0cca5...|194.81.33.57, 66....|Mozilla/5.0 (Linu...|
|2014-10-12|17:01:01|3938fffe5c0a131f5...|https://978c17aed...|      109.152.120.12|Mozilla/5.0 (Wind...|
|2014-10-12|17:01:01|88eb65d5f952f3bf5...|http://38d6db9ae3...|         2.28.82.212|Mozilla/5.0 (iPad...|
|2014-10-12|17:01:01|068d17d3e73ea7aac...|http

### Get Browser & OS Family

In [17]:
import pyspark.sql.functions as f


In [75]:
from pyspark.sql.functions import substring_index, trim, concat, to_timestamp, date_format, size
from pyspark.sql.functions import expr, regexp_replace

schemaUsers=schemaUsers.filter(col("user_agent_str").contains(";"))\
    .withColumn("browser", trim(substring_index(schemaUsers.user_agent_str, '(', 1)) )\
    .withColumn("os_long", trim(expr("regexp_replace(user_agent_str,browser,'')")) )\
    .withColumn("os", trim(substring_index( f.split(schemaUsers.user_agent_str, ';')[0], '(', -1)))\
    .withColumn("timestamp", date_format(to_timestamp(concat("date", "time"), 'yyyy-MM-ddHH:mm:ss'), 'yyyy-MM-dd_HH:mm:ss'))




In [76]:
schemaUsers.take(10)

[Row(date='2014-10-12', time='17:01:01', user_id='f4fdd9e55192e94758eb079ec6e24b219fe7d71e', url='http://6f2a9cab64bf88b57447d84129a6531000fa9f78/d47710d5be6e82ffe13a26531f09594351678cf4', ip='94.11.238.152', user_agent_str='Mozilla/5.0 (iPad; CPU OS 7_1_2 like Mac OS X) AppleWebKit/537.51.2 (KHTML, like Gecko) Version/7.0 Mobile/11D257 Safari/9537.53', browser='Mozilla/5.0', os_s='iPad; CPU OS 7_1_2 like Mac OS X) AppleWebKit/537.51.2 ', os='iPad', timestamp='2014-10-12_17:01:01', osnew=3, os_long='(iPad; CPU OS 7_1_2 like Mac OS X) AppleWebKit/537.51.2 (KHTML, like Gecko) Version/7.0 Mobile/11D257 Safari/9537.53', os_longest='Mozilla/5.0', new_col1=' (iPad; CPU OS 7_1_2 like Mac OS X) AppleWebKit/537.51.2 (KHTML, like Gecko) Version/7.0 Mobile/11D257 Safari/9537.53'),
 Row(date='2014-10-12', time='17:01:01', user_id='0ae531264993367571e487fb486b13ea412aae3d', url='http://8eb4ac417c48cd837650d956ca5341a83a65929b/4a68df257aab98bb8ce54b224c1fdfcc33b883b6', ip='92.238.71.109', user_agent

In [None]:
import pyspark.sql.functions as f


In [None]:
from pyspark.sql.functions import substring_index, trim, concat, to_timestamp, date_format, size
from pyspark.sql.functions import expr, regexp_replace

schemaUsers=schemaUsers.filter(col("user_agent_str").contains(";"))\
    .withColumn("browser", trim(substring_index(schemaUsers.user_agent_str, '(', 1)) )\
    .withColumn("os_long", trim(expr("regexp_replace(user_agent_str,browser,'')")) )\
    .withColumn("os", trim(substring_index( f.split(schemaUsers.user_agent_str, ';')[0], '(', -1)))\
    .withColumn("timestamp", date_format(to_timestamp(concat("date", "time"), 'yyyy-MM-ddHH:mm:ss'), 'yyyy-MM-dd_HH:mm:ss'))


In [65]:
geo_lookup_df = spark.read.format("mongo").option("uri", mongo_url).load().select("network","city_name","country_name")



In [66]:
geo_lookup_df.show()

+----------+----------+-------------+
|   network| city_name| country_name|
+----------+----------+-------------+
|93.152.208|      null|     Bulgaria|
|93.152.212|      null|United States|
|93.152.214|      null|United States|
|93.152.215|      null|     Bulgaria|
|93.152.216|      null|     Bulgaria|
|93.152.224|      null|     Bulgaria|
|93.154.128|      null|       Poland|
|  93.155.0|      null|       Turkey|
|  93.157.0|    Arnhem|  Netherlands|
| 93.157.16|Volgodonsk|       Russia|
| 93.157.32|      null|  Switzerland|
| 93.157.40|     Biysk|       Russia|
| 93.157.48|      null|      Germany|
| 93.157.56|      null|       Russia|
| 93.157.58|      null|   Uzbekistan|
| 93.157.59|      null|       Russia|
| 93.157.60|      null|       Russia|
| 93.157.88|      null|       Poland|
| 93.157.96|      null|       Poland|
|93.157.104|      Kyiv|      Ukraine|
+----------+----------+-------------+
only showing top 20 rows



Split ip's

In [77]:
split_ip=schemaUsers.select("timestamp", "user_id", "url", "ip", "os", "browser","os_long")\
    .withColumn("ip", f.explode((f.split(schemaUsers['ip'], ','))))


In [78]:
split_ip.take(5)

[Row(timestamp='2014-10-12_17:01:01', user_id='f4fdd9e55192e94758eb079ec6e24b219fe7d71e', url='http://6f2a9cab64bf88b57447d84129a6531000fa9f78/d47710d5be6e82ffe13a26531f09594351678cf4', ip='94.11.238.152', os='iPad', browser='Mozilla/5.0', os_long='(iPad; CPU OS 7_1_2 like Mac OS X) AppleWebKit/537.51.2 (KHTML, like Gecko) Version/7.0 Mobile/11D257 Safari/9537.53'),
 Row(timestamp='2014-10-12_17:01:01', user_id='0ae531264993367571e487fb486b13ea412aae3d', url='http://8eb4ac417c48cd837650d956ca5341a83a65929b/4a68df257aab98bb8ce54b224c1fdfcc33b883b6', ip='92.238.71.109', os='iPad', browser='Mozilla/5.0', os_long='(iPad; CPU OS 7_1_2 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) GSA/4.0.1.31280 Mobile/11D257 Safari/9537.53'),
 Row(timestamp='2014-10-12_17:01:01', user_id='c5ac174ee153f7e570b179071f702bacfa347acf', url='https://1415d3778ed2c19885da420ebcd59739f38317a2/b45e2f846bbd24b96e38c84f95a49e2b30dd8258', ip='2.26.44.196', os='Linux', browser='Mozilla/5.0', os_long='(Linux; A

In [79]:
shortened_ip=split_ip\
    .withColumn("ip_shortened", f.split(split_ip['ip'], '\.'))\
    .withColumn("ip_shortened", f.slice("ip_shortened",start=1,length=3))\
    .withColumn("ip_shortened", f.concat_ws(".", ("ip_shortened")))


In [80]:
shortened_ip.show()

+-------------------+--------------------+--------------------+---------------+--------------+-----------+--------------------+------------+
|          timestamp|             user_id|                 url|             ip|            os|    browser|             os_long|ip_shortened|
+-------------------+--------------------+--------------------+---------------+--------------+-----------+--------------------+------------+
|2014-10-12_17:01:01|f4fdd9e55192e9475...|http://6f2a9cab64...|  94.11.238.152|          iPad|Mozilla/5.0|(iPad; CPU OS 7_1...|   94.11.238|
|2014-10-12_17:01:01|0ae53126499336757...|http://8eb4ac417c...|  92.238.71.109|          iPad|Mozilla/5.0|(iPad; CPU OS 7_1...|   92.238.71|
|2014-10-12_17:01:01|c5ac174ee153f7e57...|https://1415d3778...|    2.26.44.196|         Linux|Mozilla/5.0|(Linux; Android 4...|     2.26.44|
|2014-10-12_17:01:01|2d86766f9908fde41...|http://47e1f0cca5...|   194.81.33.57|         Linux|Mozilla/5.0|(Linux; Android 4...|   194.81.33|
|2014-10-12_1

In [81]:
joined_df=shortened_ip.join(geo_lookup_df, geo_lookup_df.network == shortened_ip.ip_shortened, "inner")




In [82]:
joined_df.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- url: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- os: string (nullable = true)
 |-- browser: string (nullable = true)
 |-- os_long: string (nullable = true)
 |-- ip_shortened: string (nullable = false)
 |-- network: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- country_name: string (nullable = true)



### 1. Top 5 Countries based on number of events

In [87]:
joined_df.filter("country_name is not null")\
    .groupBy("country_name")\
    .count()\
    .sort("count",ascending=False)\
    .show(5)


+--------------+-----+
|  country_name|count|
+--------------+-----+
|United Kingdom|  546|
|       Finland|  308|
| United States|  198|
|       Croatia|   65|
|        France|   62|
+--------------+-----+
only showing top 5 rows



### 2. Top 5 Cities based on number of events

In [88]:
joined_df.filter("city_name is not null")\
    .groupBy("city_name")\
    .count()\
    .sort("count",ascending=False)\
    .show(5)


+------------+-----+
|   city_name|count|
+------------+-----+
|Lappeenranta|  308|
|     Bristol|  121|
|      Zagreb|   65|
|      London|   29|
|      Kaunas|   29|
+------------+-----+
only showing top 5 rows



### 3. Top 5 Browsers based on number of unique users

In [89]:
joined_df.groupBy("browser")\
    .agg(f.countDistinct("user_id"))\
    .sort("count(user_id)",ascending=False)\
    .show(5)


+-----------+--------------+
|    browser|count(user_id)|
+-----------+--------------+
|Mozilla/5.0|           556|
|Mozilla/4.0|            47|
+-----------+--------------+



### 4. Top 5 Operating systems based on number of unique users

In [90]:
joined_df.groupBy( "os_long","os" )\
    .agg(f.countDistinct("user_id"))\
    .sort("count(user_id)",ascending=False)\
    .show(5)


+--------------------+-----------------+--------------+
|             os_long|               os|count(user_id)|
+--------------------+-----------------+--------------+
|(en-us) AppleWebK...|KHTML, like Gecko|           114|
|(Macintosh; Intel...|        Macintosh|            69|
|(iPhone; CPU iPho...|           iPhone|            48|
|(iPhone; CPU iPho...|           iPhone|            37|
|(iPad; CPU OS 8_0...|             iPad|            25|
+--------------------+-----------------+--------------+
only showing top 5 rows



### Analysis

In [None]:
from pyspark.sql.functions import split
unique_ip_addresses=schemaUsers.withColumn('ip', split(schemaUsers['ip'], ', ')[0]).select("ip").distinct()

unique_addresses_formatted = unique_ip_addresses.select(unique_ip_addresses.columns[0])
# unique_ip_addresses=schemaUsers.select("ip").distinct().withColumn('ip', split(schemaUsers['ip'], ', '))

unique_addresses_formatted.count()


In [None]:
unique_ip_addresses=schemaUsers.withColumn('ip', split(schemaUsers['ip'], ', ')[0]).select("ip").distinct()


In [None]:
unique_addresses_formatted.printSchema()

In [None]:
unique_addresses_formatted.write.format("text").mode("Overwrite").save("unique_ip_addresses.txt")

In [None]:
from pymongo import MongoClient
# pprint library is used to make the output look more pretty
from pprint import pprint
# connect to MongoDB, change the << MONGODB URL >> to reflect your own connection string
client = MongoClient('mongodb://root:yurt@mongo:27017/')
db=client.admin
# Issue the serverStatus command and print the results
serverStatusResult=db.command("serverStatus")
pprint(serverStatusResult)


## Twighlight Zone

In [None]:
import geoip2.webservice

# This reader object should be reused across lookups as creation of it is
# expensive.
accid="yurty"
key="yurt"
with geoip2.webservice.Client(accid, key) as client:
    response = client.city('203.0.113.0')
    print(response.country.name)


In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

import requests
api_key="yurt"

# @udf
def get_country_from_ip(ip_address):
    url= f"http://api.ipapi.com/{ip_address}?access_key={api_key}"
    r = requests.get(url)
    ip_response_json=r.json()

    country=ip_response_json['country_name']
    city=ip_response_json['city']
    return (country, city)




In [None]:
! touch maxmind-database.mmdb

In [None]:
! pip install geoip2

### Expensive! But hey..

In [None]:
# unique_ip_addresses.rdd.map(lambda row: row.asDict())


ip_array = [str(row.ip) for row in unique_ip_addresses.collect()]

ip_list=[]
for ip in ip_array:
    if ', ' in ip:
        split_ip=ip.split(', ')
        ip_list.append({ip: get_country_from_ip(split_ip[0])})
        ip_list.append({ip: get_country_from_ip(split_ip[1])})

    else:
        ip_list.append({ip: get_country_from_ip(ip)})
        
len(ip_list)

#### Failed on hitting limit API License for 24 k requests


### Cheaper but time consuming..

### Unique IP Addresses

In [None]:
get_country_from_ip = udf(get_country_from_ip, ArrayType(StringType()))


### Computer says no...

In [None]:
unique_ip_addresses.select(*[get_country_from_ip('ip')])

In [None]:
ip_geo_df = unique_ip_addresses.select(get_country_from_ip("ip").alias("geolocation"))


In [None]:
ip_geo_df.show()

In [None]:
# for ip in unique_ip_addresses_list:
#     print('{}: {}'.format(type(ip), ip))

dict(unique_ip_addresses_list)

In [None]:
import requests
# api_key=""
ip_address="188.141.30.136"

In [None]:
get_ip_details(ip_address)

In [None]:
ip_response_json