# PySpark IP Address Geo Lookup
- Shamelessly adopted code from (https://github.com/adampolomski/prism/blob/master/tools/wppl.py)

## Install GeoIP2 Along With PySpark
- sudo pip install geoip2
- http://geoip2.readthedocs.io/en/latest/#city-database

## Import Needed PySpark Modules

In [12]:
import geoip2.database
import pyspark
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *

## Initialize PySpark
- `local[*]` sets PySpark to utilize all processors on your local machine.

In [2]:
conf = SparkConf()\
                .setMaster("local[*]")\
                .setAppName("pysparkIPAddressGeoLookup")
        
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)
spark = SparkSession.builder.appName("spark play").getOrCreate()

## Prove GeoIP2 is got imported properly

In [3]:
cityReader = geoip2.database.Reader(r'GeoLite2-City.mmdb')
city = cityReader.city("5.153.63.162")
print(city)

geoip2.models.City({'city': {'geoname_id': 2759794, 'names': {'de': 'Amsterdam', 'en': 'Amsterdam', 'es': 'Ámsterdam', 'fr': 'Amsterdam', 'ja': 'アムステルダム', 'pt-BR': 'Amesterdão', 'ru': 'Амстердам', 'zh-CN': '阿姆斯特丹'}}, 'continent': {'code': 'EU', 'geoname_id': 6255148, 'names': {'de': 'Europa', 'en': 'Europe', 'es': 'Europa', 'fr': 'Europe', 'ja': 'ヨーロッパ', 'pt-BR': 'Europa', 'ru': 'Европа', 'zh-CN': '欧洲'}}, 'country': {'geoname_id': 2750405, 'is_in_european_union': True, 'iso_code': 'NL', 'names': {'de': 'Niederlande', 'en': 'Netherlands', 'es': 'Holanda', 'fr': 'Pays-Bas', 'ja': 'オランダ王国', 'pt-BR': 'Holanda', 'ru': 'Нидерланды', 'zh-CN': '荷兰'}}, 'location': {'accuracy_radius': 100, 'latitude': 52.3556, 'longitude': 4.9135, 'time_zone': 'Europe/Amsterdam'}, 'postal': {'code': '1091'}, 'registered_country': {'geoname_id': 2750405, 'is_in_european_union': True, 'iso_code': 'NL', 'names': {'de': 'Niederlande', 'en': 'Netherlands', 'es': 'Holanda', 'fr': 'Pays-Bas', 'ja': 'オランダ王国', 'pt-BR': '

## Create Spark Dataframe Example
- user_id
- ip_address

In [21]:
data = [(0, "5.153.63.162"), (1, "159.8.223.72"), (2, "169.38.84.49"), (3, "23.246.195.8"), (4, "158.176.86.249")]
events_df = spark.createDataFrame(data, ["user_id", "ip_address"])
print(events_df.show())

+-------+--------------+
|user_id|    ip_address|
+-------+--------------+
|      0|  5.153.63.162|
|      1|  159.8.223.72|
|      2|  169.38.84.49|
|      3|  23.246.195.8|
|      4|158.176.86.249|
+-------+--------------+

None


## Creates a function that returns a new enriched row with geo information

In [17]:
cityReader = geoip2.database.Reader(r'GeoLite2-City.mmdb')

def asEnrichedRow(lines):
    for line in lines:
        try:
            if len(line["ip_address"]) > 7:
                city = cityReader.city(line["ip_address"])
                yield Row(
                    user_id=line["user_id"], 
                    ip_address=line["ip_address"],
                    local_timezone=city.location.time_zone, 
                    city=city.city.name, 
                    country=city.country.name, 
                    latitude = city.location.latitude, 
                    longitude = city.location.longitude)
        except GeneratorExit:
            return
        except:
            ""

## Use PySpark to call the function above and extract geo information
- Differences between mapPartitions() and map()
    - https://stackoverflow.com/questions/49142373/what-is-the-difference-between-mappartitions-and-foreachpartition-in-apache-spar

In [18]:
enriched_events_df = events_df.rdd.mapPartitions(asEnrichedRow)\
                                   .map(lambda r: (
                                        r.user_id, 
                                        r.ip_address, 
                                        r.local_timezone, 
                                        r.city, 
                                        r.country, 
                                        r.latitude, 
                                        r.longitude))\
                                   .toDF()\
                                   .select(
                                        col("_1").alias("user_id"),
                                        col("_2").alias("ip_address"),
                                        col("_3").alias("local_timezone"),
                                        col("_4").alias("city"),
                                        col("_5").alias("country"),
                                        col("_6").alias("latitude"),
                                        col("_7").alias("longitude")
                                   )

## Finally, we have a PySpark dataframe w/ Geo Information

In [19]:
enriched_events_df.show()

+-------+--------------+----------------+---------+-------------+--------+---------+
|user_id|    ip_address|  local_timezone|     city|      country|latitude|longitude|
+-------+--------------+----------------+---------+-------------+--------+---------+
|      0|  5.153.63.162|Europe/Amsterdam|Amsterdam|  Netherlands| 52.3556|   4.9135|
|      1|  159.8.223.72|Europe/Amsterdam|Amsterdam|  Netherlands| 52.3556|   4.9135|
|      2|  169.38.84.49|    Asia/Kolkata|  Chennai|        India| 13.0833|  80.2833|
|      3|  23.246.195.8| America/Chicago|   Dallas|United States| 32.7787| -96.8217|
|      4|158.176.86.249|            null|     null|United States|  37.751|  -97.822|
+-------+--------------+----------------+---------+-------------+--------+---------+

