In [1]:
from datetime import date, timedelta, datetime
import pandas

def int2date(n):
    """
    This function converts a number of days since Jan 1st 1970 <n> to a date.
    """
    return date(1970,1,1)+timedelta(days=n)

def date2int(d):
    """
    This function converts a date <d> to number of days since Jan 1st 1970.
    """
    return (d-date(1970,1,1)).days

def str2date(s, f="%Y%m%d"):
    """
    This function converts a string <s> in the format <f> to a date.
    """
    return datetime.strptime(s, f).date()

In [2]:
#TODAY_INT = date2int(date.today())                        # today (day notebook is run) as days since Jan 1st, 1970
PCD_CUTOUT_START_INT = date2int(date(2010, 1, 1))  # profiles created before this date are removed (days since Jan 1st, 1970)
SD_CUTOUT_START_STR = "20160801"                        # pings submitted before this date are removed
SD_CUTOUT_END_STR = "20161231"                        # pings submitted before this date are removed
cities = ["London",
         "Cardiff",
         "Manchester",
         "Liverpool",
         "Dublin",
         "Glasgow",
         "Edinburgh",
         "Paris",
         "Nice",
         "Lille",
         "Bordeaux",
         "Lyon",
         "Marseille",
         "Madrid",
         "Barcelona",
         "Valencia",
         "Sevilla",
         "Lisbon",
         "Porto",
         "Bern",
         "Geneva",
         "Berlin",
         "Frankfurt",
         "Munich",
         "Cologne",
         "Hamburg",
         "Amsterdam",
         "Brussels",
         "Milan",
         "Rome",
         "Vienna",
         "Prague",
         "Athens",
         "Copenhagen",
         "Warsaw",
         "Budapest",
         "Sofia",
         "Bucharest",
         "Sarajevo",
         "Zazgreb",
         "Ljubjana",
         "Belgrade"]
countries = ["GB",
            "FR",
            "ES",
            "PT",
            "IE",
            "CH",
            "DE",
            "NL",
            "IT",
            "BE",
            "DK",
            "CZ",
            "GR",
            "PL",
            "RO",
            "HR",
            "HU",
            "AT",
            "RS",
            "BG",
            "BA"]

In [8]:
lat_lng = pandas.read_csv("simplemaps-worldcities-basic.csv")
lat_lng = lat_lng.filter(items = ['iso2', 'city', 'lat', 'lng'])
lat_lng = lat_lng[(lat_lng.iso2.isin(countries) & lat_lng.city.isin(cities))]
lat_lng.columns = ["country", "city", "lat", "lng"]

In [9]:
lat_lng

Unnamed: 0,country,city,lat,lng
603,AT,Vienna,48.200015,16.366639
663,BE,Brussels,50.833317,4.333317
759,BA,Sarajevo,43.850022,18.383002
1190,BG,Sofia,42.683349,23.316654
2313,CZ,Prague,50.083337,14.46598
2598,FR,Nice,43.715018,7.265024
2601,FR,Lille,50.649969,3.080008
2605,FR,Bordeaux,44.850013,-0.595013
2606,FR,Marseille,43.289979,5.37501
2609,FR,Lyon,45.770009,4.83003


In [10]:
lat_lng = sqlContext.createDataFrame(lat_lng)

In [5]:
# connect to the main_summary dataset
allPingsDF = sqlContext.read.load("s3://telemetry-parquet/main_summary/v3", "parquet", mergeSchema=True)

# perform variable selection with column renaming
allPingsDFSelect = allPingsDF.select(
           allPingsDF.client_id.alias("cid"),
           allPingsDF.sample_id.alias("sid"),
           allPingsDF.normalized_channel.alias("channel"),
           allPingsDF.submission_date_s3.alias("sd"),
           allPingsDF.app_name.alias("appname"),
           allPingsDF.app_version[:2].alias("appversion"),
           allPingsDF.country,
           allPingsDF.city,
           allPingsDF.os,
           allPingsDF.os_version)

In [14]:
filteredPingsDF = allPingsDFSelect.filter(allPingsDFSelect.sid == "42")\
                                  .filter(allPingsDFSelect.channel == "release")\
                                  .filter(allPingsDFSelect.appname == "Firefox")\
                                  .filter(allPingsDFSelect.country.isin(countries))\
                                  .filter(allPingsDFSelect.city.isin(cities))\
                                  .filter(allPingsDFSelect.os.isin("Windows_NT", "Darwin", "Linux"))\
                                  .filter(allPingsDFSelect.sd >= SD_CUTOUT_START_STR)\
                                  .filter(allPingsDFSelect.sd <= SD_CUTOUT_END_STR)\
                                  .select(["cid", "country", "city", "sd", "appversion", "os", "os_version"])

In [37]:
joinedDF = filteredPingsDF.join(lat_lng, ["country", "city"], "inner")

In [38]:
joinedDF.filter(joinedDF.city == "Porto").take(5)

[Row(country=u'PT', city=u'Porto', cid=u'7b4332a0-e28c-4301-9850-a1136f833394', sd=u'20161120', appversion=u'50', os=u'Windows_NT', os_version=u'6.3', lat=41.15000633, lng=-8.620001262999999),
 Row(country=u'PT', city=u'Porto', cid=u'31f6a4a5-bf81-4be6-a199-ca7df1e5f2ef', sd=u'20161120', appversion=u'49', os=u'Windows_NT', os_version=u'10.0', lat=41.15000633, lng=-8.620001262999999),
 Row(country=u'PT', city=u'Porto', cid=u'6b001f66-ac56-4186-a430-c95d64f8d268', sd=u'20161120', appversion=u'49', os=u'Windows_NT', os_version=u'10.0', lat=41.15000633, lng=-8.620001262999999),
 Row(country=u'PT', city=u'Porto', cid=u'fb4c01f3-1d2c-4bb1-a083-f3f254bd74ed', sd=u'20161120', appversion=u'50', os=u'Windows_NT', os_version=u'10.0', lat=41.15000633, lng=-8.620001262999999),
 Row(country=u'PT', city=u'Porto', cid=u'b0b2656d-dc29-491d-9189-06981fa3296e', sd=u'20161120', appversion=u'50', os=u'Windows_NT', os_version=u'10.0', lat=41.15000633, lng=-8.620001262999999)]

In [39]:
sqlContext.registerDataFrameAsTable(joinedDF, "joinedDF")

In [60]:
query = """
    SELECT country
          , city
          , lat
          , lng
          , sd
          , appversion
          , prop_cnt_month
          , sum_city_month/sum_month as prop_city_month
    FROM(
        SELECT country
              , city
              , lat
              , lng
              , sd
              , appversion
              , cnt
              , cnt/sum(cnt) over(partition by country, city, sd) as prop_cnt_month
              , sum(cnt) over(partition by country, city, sd) as sum_city_month
              , sum(cnt) over(partition by sd) as sum_month
        FROM(
            SELECT country
                  , city
                  , lat
                  , lng
                  , substr(to_date(CAST(UNIX_TIMESTAMP(sd, 'yyyyMMdd') AS TIMESTAMP)), 1, 7) AS sd
                  , appversion
                  , count(distinct cid) as cnt
            FROM joinedDF
            GROUP BY 1, 2, 3, 4, 5, 6
        )
    )
    ORDER BY 1, 2, 5, 6
    """
aggregateDF = sqlContext.sql(query)

In [59]:
aggregateDF.show()

+-------+--------+-----------+-----------+-------+----------+--------------------+--------------------+
|country|    city|        lat|        lng|     sd|appversion|      prop_cnt_month|     prop_city_month|
+-------+--------+-----------+-----------+-------+----------+--------------------+--------------------+
|     AT|  Vienna|48.20001528|16.36663896|2016-12|        50|  0.6413349416968235| 0.03994154112999068|
|     AT|  Vienna|48.20001528|16.36663896|2016-12|        43| 0.02734217933252915| 0.03994154112999068|
|     AT|  Vienna|48.20001528|16.36663896|2016-12|        44|0.008363490148773623| 0.03994154112999068|
|     AT|  Vienna|48.20001528|16.36663896|2016-12|        49|  0.1470044229995979| 0.03994154112999068|
|     AT|  Vienna|48.20001528|16.36663896|2016-12|        47| 0.08387615601125854| 0.03994154112999068|
|     AT|  Vienna|48.20001528|16.36663896|2016-12|        46|0.010695617209489345| 0.03994154112999068|
|     AT|  Vienna|48.20001528|16.36663896|2016-12|        48| 0.

In [64]:
a = aggregateDF.groupBy(["country", "city", "sd"])\
.agg({"lat": "first",
     "lng": "first",
     "appversion": "collect_list",
     "prop_cnt_month": "collect_list",
     "prop_city_month": "first"})\
.withColumnRenamed("first(lat)", "lat")\
.withColumnRenamed("first(lng)", "lng")\
.withColumnRenamed("collect_list(appversion)", "appversion")\
.withColumnRenamed("collect_list(prop_cnt_month)", "prop_cnt_month")\
.withColumnRenamed("first(prop_city_month)", "prop_city_month")
a.show()

+-------+----------+-------+-------------------+--------------------+--------------------+--------------------+-----------+
|country|      city|     sd|                lng|      prop_cnt_month|     prop_city_month|          appversion|        lat|
+-------+----------+-------+-------------------+--------------------+--------------------+--------------------+-----------+
|     CH|    Geneva|2016-09|        6.140028034|[0.01181102362204...|6.077122028131811E-4|[41, 42, 43, 44, ...|46.21000755|
|     FR|      Lyon|2016-12|        4.830030475|[0.00791738382099...|0.018661870041435134|[41, 42, 43, 44, ...|45.77000856|
|     CH|    Geneva|2016-12|        6.140028034|[0.01169590643274...|5.492564160215848E-4|[41, 42, 43, 44, ...|46.21000755|
|     CZ|    Prague|2016-08|        14.46597978|[0.00606512023834...|0.029857195050275603|[41, 42, 43, 44, ...|50.08333701|
|     DE|    Munich|2016-12|        11.57499345|[5.81767409389726...| 0.05521151190055568|[39, 40, 41, 42, ...|48.12994204|
|     BG

In [65]:
j = a.toPandas()

In [66]:
j.to_json("main_cities_sized.json", "records")

In [117]:
query = """
    SELECT country
          , city
          , sd
          , os
          , os_version
          , cnt
    FROM(
        SELECT country
              , city
              , substr(to_date(CAST(UNIX_TIMESTAMP(sd, 'yyyyMMdd') AS TIMESTAMP)), 1, 7) AS sd
              , os
              , concat(split(os_version, "[:numeric:\.:numeric:]")[0],
                       ".",
                       split(os_version, "[:numeric:\.:numeric:]")[1]) as os_version
              , count(distinct cid) as cnt
        FROM joinedDF
        GROUP BY 1, 2, 3, 4, 5
    )
    ORDER BY 1, 2, 3, 4, 5
    """
aggregateDF2 = sqlContext.sql(query)

In [118]:
aggregateDF2.show()

+-------+------+-------+------+----------+---+
|country|  city|     sd|    os|os_version|cnt|
+-------+------+-------+------+----------+---+
|     AT|Vienna|2016-08|Darwin|      10.0|  2|
|     AT|Vienna|2016-08|Darwin|      10.7|  1|
|     AT|Vienna|2016-08|Darwin|      10.8| 73|
|     AT|Vienna|2016-08|Darwin|      11.3|  1|
|     AT|Vienna|2016-08|Darwin|      11.4| 52|
|     AT|Vienna|2016-08|Darwin|      12.2|  3|
|     AT|Vienna|2016-08|Darwin|      12.4|  4|
|     AT|Vienna|2016-08|Darwin|      12.5| 13|
|     AT|Vienna|2016-08|Darwin|      12.6| 22|
|     AT|Vienna|2016-08|Darwin|      13.1|  1|
|     AT|Vienna|2016-08|Darwin|      13.2|  1|
|     AT|Vienna|2016-08|Darwin|      13.3|  4|
|     AT|Vienna|2016-08|Darwin|      13.4| 61|
|     AT|Vienna|2016-08|Darwin|      14.0|  9|
|     AT|Vienna|2016-08|Darwin|      14.1|  7|
|     AT|Vienna|2016-08|Darwin|      14.3|  6|
|     AT|Vienna|2016-08|Darwin|      14.4|  8|
|     AT|Vienna|2016-08|Darwin|      14.5|141|
|     AT|Vien

In [119]:
j = aggregateDF2.toPandas()

In [120]:
j.to_json("os2.json", "records")