In [1]:
import findspark
findspark.init()
from pyspark.sql.functions import col
from pyspark.sql import functions as F
import pandas as pd
import pyspark
from pyspark.sql import SparkSession

from functools import reduce

spark = (
    SparkSession.builder.appName('yxing_qa_test').master('yarn-client').config(
        'spark.executor.memory', '28g').config('spark.cores.max', '10').config(
            'spark.driver.memory',
            '10g').config('spark.shuffle.consolidateFiles').config(
                'spark.executor.instances',
                '20').config('spark.yarn.queue',
                             'root.digital.heavy').getOrCreate())
sc = spark.sparkContext

In [2]:
month_id = '239'
test_bs_path = "/mapr/ia1.comscore.com/data/partitions/*/beacon/monthly/" + month_id + \
"m/tag_5952_snap_testing/beacon_session/*/c1=19"
prod_bs_path = "/mapr/ia1.comscore.com/data/partitions/*/beacon/monthly/" + month_id + \
"m/beacon_session/*/c1=19/beacon_session-00000-10000"

test_schema = [
    'date', 'app_version', 'country', 'device_model', 'hashed_idfa', 'os_type',
    'os_version', 'publisher_id', 'total_snap_view', 'edition_view_count',
    'total_time_viewed', 'snap_created_id'
]

prod_schema = [
    'c1', 'c2', 'uid', 'pattern_id', 'time_id', 'hits', 'start_ss2k',
    'duration', 'session_id', 'ip', 'os', 'bt', 'dist_ss2k', 'pages', 'iab',
    'akamai_id', 'ua_hash', 'ns'
]

In [3]:
test_bs = spark.read.format('csv').option("delimiter", '\t').option("inferSchema", "true").load(test_bs_path).toDF(*prod_schema)

In [None]:
prod_bs = spark.read.format('csv').option("delimiter", '\t').option("inferSchema", "true").load(prod_bs_path).toDF(*prod_schema)

In [4]:
test_bs.count()

58106806506

In [None]:
prod_bs.count()

In [17]:
test_prod = test_bs.join(prod_bs, prod_schema, how = 'inner')

In [18]:
test_prod.count()

5716431

In [6]:
prod_tmp = prod_bs.select(*(col(x).alias('prod_' + x) for x in prod_bs.columns))
test_tmp = test_bs.select(*(col(x).alias('test_' + x) for x in test_bs.columns))

cond = [prod_tmp['prod_' + x] == test_tmp['test_' + x] for x in prod_bs.columns]

compare = prod_tmp.join(test_tmp, cond, how='full')

test_only = compare.where(
    reduce(lambda x, y: x & y, (col(x).isNull() for x in prod_tmp.columns)))
prod_only = compare.where(
    reduce(lambda x, y: x & y, (col(x).isNull() for x in test_tmp.columns)))

In [7]:
prod_only.count()

731995

In [21]:
prod_only.show()

+-------+-------+--------------------+---------------+------------+---------+---------------+-------------+---------------+--------------+--------+-------+--------------+----------+--------+--------------+--------------------+--------------------+-------+-------+--------+---------------+------------+---------+---------------+-------------+---------------+-------+-------+-------+--------------+----------+--------+--------------+------------+-------+
|prod_c1|prod_c2|            prod_uid|prod_pattern_id|prod_time_id|prod_hits|prod_start_ss2k|prod_duration|prod_session_id|       prod_ip| prod_os|prod_bt|prod_dist_ss2k|prod_pages|prod_iab|prod_akamai_id|        prod_ua_hash|             prod_ns|test_c1|test_c2|test_uid|test_pattern_id|test_time_id|test_hits|test_start_ss2k|test_duration|test_session_id|test_ip|test_os|test_bt|test_dist_ss2k|test_pages|test_iab|test_akamai_id|test_ua_hash|test_ns|
+-------+-------+--------------------+---------------+------------+---------+---------------+-

In [22]:
test_only.count()

95379

In [25]:
prod_only_uid = prod_only.select('prod_uid').distinct()
test_prod_uid = test_prod.select('uid').distinct()

In [29]:
prod_only_uid.join(test_prod_uid,
                   prod_only_uid.prod_uid == test_prod_uid.uid,
                   how='inner').count()

64093

In [30]:
prod_only_uid.count()

260308

In [31]:
test_prod_uid.count()

1629223

In [8]:
snap_path = '/mapr/ia1.comscore.com/test_data/census/tag_5920/testing/reformatted/snapchat/*'
snap_schema = ['date', 'app_version', 'country', 'device_model', 'hashed_idfa', 'os_type', 'os_version', 'publisher_id', 
               'total_snap_view', 'edition_view_count', 'total_time_viewed', 'snap_created_id']
snap = spark.read.format('csv').option("delimiter", '\t').option("inferSchema", "true").load(snap_path).toDF(*snap_schema)

In [81]:
snap.filter('snap_created_id==1').filter("date=='2019-11-30 00:00:00 UTC'").show(20, False)

+-----------------------+-----------+-------+------------+----------------------------------------------------------------+-------+----------+----------------+---------------+------------------+-----------------+---------------+
|date                   |app_version|country|device_model|hashed_idfa                                                     |os_type|os_version|publisher_id    |total_snap_view|edition_view_count|total_time_viewed|snap_created_id|
+-----------------------+-----------+-------+------------+----------------------------------------------------------------+-------+----------+----------------+---------------+------------------+-----------------+---------------+
|2019-11-30 00:00:00 UTC|10.67.0.0  |US     |LM-X220     |2f76c7ba8016d26de9d7320859e7ed457df1dbeabf0ed8d91713ea816ad9ad92|Android|9         |5340479800213504|49             |3                 |390              |1              |
|2019-11-30 00:00:00 UTC|10.70.0.0  |US     |SM-A102U    |d303287a3d65effe81a2ee1d26

In [82]:
snap.filter('snap_created_id==0').filter("date=='2019-11-30 00:00:00 UTC'").show(20, False)

+-----------------------+-----------+-------+---------------+----------------------------------------------------------------+-------+----------+----------------+---------------+------------------+-----------------+---------------+
|date                   |app_version|country|device_model   |hashed_idfa                                                     |os_type|os_version|publisher_id    |total_snap_view|edition_view_count|total_time_viewed|snap_created_id|
+-----------------------+-----------+-------+---------------+----------------------------------------------------------------+-------+----------+----------------+---------------+------------------+-----------------+---------------+
|2019-11-30 00:00:00 UTC|10.69.0.0  |US     |SM-J727T       |56BA92B6A50FDDEB029EEE7149CB8140391B670BD1E6F963464EB6C824952A9D|Android|8.1.0     |5134826088431616|580            |1                 |1798.8           |0              |
|2019-11-30 00:00:00 UTC|10.61.0.0  |US     |LM-X210CM      |2F725E7C81C

In [53]:
snap.filter('snap_created_id==1').count()

179393151

In [54]:
snap.count()

1248158651

In [56]:
179393151./1248158651.

0.14372624093601705

In [64]:
snap.filter('snap_created_id==1').select('hashed_idfa').distinct().count()

10380812

In [65]:
snap.select('hashed_idfa').distinct().count()

54221838

In [66]:
10380812./54221838.

0.19145075827197153

In [162]:
snap_c = snap.filter('snap_created_id==1').select('hashed_idfa')
prod_bs_t = prod_bs.withColumn('uid_t', F.lower(col('uid')))
prod_snap_c = prod_bs_t.alias('a').join(snap_c, snap_c.hashed_idfa == prod_bs_t.uid_t, 'inner').select('a.*')

In [163]:
prod_snap_c.count()

0

In [71]:
snap_nc = snap.filter('snap_created_id==0').select('hashed_idfa')
prod_snap_nc = prod_bs.alias('a').join(snap_nc, snap_nc.hashed_idfa == prod_bs.ua_hash, 'inner').select('a.*')

In [72]:
prod_snap_nc.count()

0

In [34]:
prod_bs.printSchema()

root
 |-- c1: integer (nullable = true)
 |-- c2: integer (nullable = true)
 |-- uid: string (nullable = true)
 |-- pattern_id: integer (nullable = true)
 |-- time_id: integer (nullable = true)
 |-- hits: integer (nullable = true)
 |-- start_ss2k: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- session_id: integer (nullable = true)
 |-- ip: string (nullable = true)
 |-- os: integer (nullable = true)
 |-- bt: integer (nullable = true)
 |-- dist_ss2k: integer (nullable = true)
 |-- pages: integer (nullable = true)
 |-- iab: integer (nullable = true)
 |-- akamai_id: long (nullable = true)
 |-- ua_hash: string (nullable = true)
 |-- ns: string (nullable = true)



In [230]:
test_only.coalesce(1).write.format("csv").options(delimiter=',').save('snap_ccpa/')

In [235]:
test_only.coalesce(2).write.format("csv").options(delimiter=',').save('test_ccpa/')

AnalysisException: u'path maprfs:/user/yxing/test_ccpa already exists.;'

In [36]:
prod_only.show()

+-------+-------+--------------------+---------------+------------+---------+---------------+-------------+---------------+--------------+--------+-------+--------------+----------+--------+--------------+--------------------+--------------------+-------+-------+--------+---------------+------------+---------+---------------+-------------+---------------+-------+-------+-------+--------------+----------+--------+--------------+------------+-------+
|prod_c1|prod_c2|            prod_uid|prod_pattern_id|prod_time_id|prod_hits|prod_start_ss2k|prod_duration|prod_session_id|       prod_ip| prod_os|prod_bt|prod_dist_ss2k|prod_pages|prod_iab|prod_akamai_id|        prod_ua_hash|             prod_ns|test_c1|test_c2|test_uid|test_pattern_id|test_time_id|test_hits|test_start_ss2k|test_duration|test_session_id|test_ip|test_os|test_bt|test_dist_ss2k|test_pages|test_iab|test_akamai_id|test_ua_hash|test_ns|
+-------+-------+--------------------+---------------+------------+---------+---------------+-

In [38]:
prod_only.printSchema()

root
 |-- prod_c1: integer (nullable = true)
 |-- prod_c2: integer (nullable = true)
 |-- prod_uid: string (nullable = true)
 |-- prod_pattern_id: integer (nullable = true)
 |-- prod_time_id: integer (nullable = true)
 |-- prod_hits: integer (nullable = true)
 |-- prod_start_ss2k: integer (nullable = true)
 |-- prod_duration: integer (nullable = true)
 |-- prod_session_id: integer (nullable = true)
 |-- prod_ip: string (nullable = true)
 |-- prod_os: integer (nullable = true)
 |-- prod_bt: integer (nullable = true)
 |-- prod_dist_ss2k: integer (nullable = true)
 |-- prod_pages: integer (nullable = true)
 |-- prod_iab: integer (nullable = true)
 |-- prod_akamai_id: long (nullable = true)
 |-- prod_ua_hash: string (nullable = true)
 |-- prod_ns: string (nullable = true)
 |-- test_c1: integer (nullable = true)
 |-- test_c2: long (nullable = true)
 |-- test_uid: string (nullable = true)
 |-- test_pattern_id: integer (nullable = true)
 |-- test_time_id: integer (nullable = true)
 |-- test_h

In [69]:
prod_bs.select('uid').distinct().count()

1825437

In [70]:
test_bs.select('uid').distinct().count()

1663391

In [94]:
test_bs.select('ns').filter(test_bs.ns.contains('ns_ak_d_sha2')).show(20, False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ns                                                                                                                                                                                                     |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ns_ap_dbt=0&c4=&ns_ak=&ns_ak_d_sha2=790B262859865ED6DF1CD96396652A37920FE9F957C7BC20CD0151FE0E2B1BAF&ns_ap_fg=0&ns_ap_dft=1000&ns_ap_pfm=ANDROID&ns_ap_sv=5.3.4.170714&ns_ap_as=1&ns_ak_d_vce=&SNAP=   |
|ns_ap_dbt=0&c4=&ns_ak=&ns_ak_d_sha2=04505D0FE454CD73C7B903DC836744AF460B21730C0B7A7F57316113D4984052&ns_ap_fg=0&ns_ap_dft=102300&ns_ap_pfm=IOS&ns_ap_sv=5.3.4.170714&ns_ap_as=1&ns_ak_d_vce=&SN

In [127]:
prod_bs.select('ns').filter(prod_bs.ns.contains('ns_ak_d_sha2')).show(20, False)

+---+
|ns |
+---+
+---+



In [99]:
prod_bs.filter(prod_bs.ns.contains('D01725EB28A8683B534DFC2111EFF6BA53A2A5D4376ADA0A1CC1B0830DEBC180')).show()

+---+---+---+----------+-------+----+----------+--------+----------+---+---+---+---------+-----+---+---------+-------+---+
| c1| c2|uid|pattern_id|time_id|hits|start_ss2k|duration|session_id| ip| os| bt|dist_ss2k|pages|iab|akamai_id|ua_hash| ns|
+---+---+---+----------+-------+----+----------+--------+----------+---+---+---+---------+-----+---+---------+-------+---+
+---+---+---+----------+-------+----+----------+--------+----------+---+---+---+---------+-----+---+---------+-------+---+



In [101]:
snap.filter(snap.hashed_idfa.contains('A40A557185268F87C671AEE8E5FE05366D93DB0C3DF65548B3B55E9B317B5390')).show(20)

+--------------------+-----------+-------+------------+--------------------+-------+----------+----------------+---------------+------------------+-----------------+---------------+
|                date|app_version|country|device_model|         hashed_idfa|os_type|os_version|    publisher_id|total_snap_view|edition_view_count|total_time_viewed|snap_created_id|
+--------------------+-----------+-------+------------+--------------------+-------+----------+----------------+---------------+------------------+-----------------+---------------+
|2019-11-12 00:00:...|  10.69.5.0|     US|   SM-G935R4|A40A557185268F87C...|Android|     8.0.0|5106373324963840|             19|                 1|               87|              0|
|2019-11-16 00:00:...|  10.70.0.0|     US|   SM-G935R4|A40A557185268F87C...|Android|     8.0.0|5722721933590528|             88|                 2|            570.6|              0|
|2019-11-14 00:00:...|  10.70.0.0|     US|   SM-G935R4|A40A557185268F87C...|Android|     8

In [105]:
prod_only.select('prod_ns').filter(prod_only.prod_ns.contains('ns_ak_d_sha2')).show(20, False)

+-------+
|prod_ns|
+-------+
+-------+



In [138]:
from pyspark.sql.functions import regexp_extract, col
t = test_bs.filter(test_bs.ns.contains('ns_ak_d_sha2')).select('ns').withColumn('nsaksha2', regexp_extract(col('ns'), '\&ns_ak_d_sha2=(.*?)\&', 0))

In [148]:
tt = t.withColumn('nask', t['nsaksha2'].substr(15,64))

In [156]:
from pyspark.sql.functions import upper
snap_t = snap.withColumn('nsak', upper(snap.hashed_idfa))

In [157]:
ts = tt.join(snap_t, tt.nask == snap_t.nsak, how= 'left')

In [158]:
ts.show()

+--------------------+--------------------+--------------------+--------------------+-----------+-------+------------+--------------------+-------+----------+----------------+---------------+------------------+-----------------+---------------+--------------------+
|                  ns|            nsaksha2|                nask|                date|app_version|country|device_model|         hashed_idfa|os_type|os_version|    publisher_id|total_snap_view|edition_view_count|total_time_viewed|snap_created_id|                nsak|
+--------------------+--------------------+--------------------+--------------------+-----------+-------+------------+--------------------+-------+----------+----------------+---------------+------------------+-----------------+---------------+--------------------+
|ns_ap_dbt=0&c4=&n...|&ns_ak_d_sha2=DE7...|DE70A20F550AAA851...|2019-11-19 00:00:...|  10.63.1.1|     US|  iPhone11,2|DE70A20F550AAA851...|    iOS|    13.1.3|6328325341446144|              1|           

In [159]:
ts.groupBy('snap_created_id').count().show()

+---------------+-------+
|snap_created_id|  count|
+---------------+-------+
|              0|5827839|
|           null|  13816|
+---------------+-------+



In [165]:
test_bs.select

+---+----------------+----------------------+----------+-------+----+----------+--------+----------+---------------+--------+---+---------+-----+---+----------+--------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|c1 |c2              |uid                   |pattern_id|time_id|hits|start_ss2k|duration|session_id|ip             |os      |bt |dist_ss2k|pages|iab|akamai_id |ua_hash                         |ns                                                                                                                                                                                                                                                                                                      

In [167]:
# Join on no ns values
prod_tmp_ns = prod_bs.select(*(col(x).alias('prod_' + x) for x in prod_bs.columns))
test_tmp_ns = test_bs.select(*(col(x).alias('test_' + x) for x in test_bs.columns))

cond_ns = [prod_tmp['prod_' + x] == test_tmp['test_' + x] for x in prod_bs.columns if x != 'ns']

compare_ns = prod_tmp.join(test_tmp, cond_ns, how='full')

In [168]:
prod_only_ns = compare_ns.where(
    reduce(lambda x, y: x | y, (col(x).isNull() for x in test_tmp_ns.columns if x != 'test_ns')))

In [175]:
prod_only_ns.filter("test_ns != 'null'").show(20, False)

+-------+-------+--------+---------------+------------+---------+---------------+-------------+---------------+-------+-------+-------+--------------+----------+--------+--------------+------------+-------+-------+----------------+----------------------+---------------+------------+---------+---------------+-------------+---------------+-------+--------+-------+--------------+----------+--------+--------------+------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|prod_c1|prod_c2|prod_uid|prod_pattern_id|prod_time_id|prod_hits|prod_start_ss2k|prod_duration|prod_session_id|prod_ip|prod_os|prod_bt|prod_dist_ss2k|prod_pages|prod_iab|prod_akamai_id|prod_ua_hash|prod_ns|test_c1|test_c2         |test_uid              |test_pattern_id|test_time_id|test_hits|test_start_ss2k|test_duration|test_session_id|test_ip|test_os |test_bt|test_dist

In [176]:
cond_ns

[Column<(prod_c1 = test_c1)>,
 Column<(prod_c2 = test_c2)>,
 Column<(prod_uid = test_uid)>,
 Column<(prod_pattern_id = test_pattern_id)>,
 Column<(prod_time_id = test_time_id)>,
 Column<(prod_hits = test_hits)>,
 Column<(prod_start_ss2k = test_start_ss2k)>,
 Column<(prod_duration = test_duration)>,
 Column<(prod_session_id = test_session_id)>,
 Column<(prod_ip = test_ip)>,
 Column<(prod_os = test_os)>,
 Column<(prod_bt = test_bt)>,
 Column<(prod_dist_ss2k = test_dist_ss2k)>,
 Column<(prod_pages = test_pages)>,
 Column<(prod_iab = test_iab)>,
 Column<(prod_akamai_id = test_akamai_id)>,
 Column<(prod_ua_hash = test_ua_hash)>]

In [177]:
compare_ns.count()

6543805

In [178]:
compare.count()

6543805

In [179]:
prod_test = prod_tmp.join(test_tmp, cond_ns, how='inner')

In [185]:
compare_pt = compare.filter('prod_ns != test_ns')

In [190]:
compare_ns.select(['prod_ns', 'test_ns']).show(20, False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|prod_ns                                                                                                                                                                                                                                                                                                                         |test_ns                           

In [194]:
compare_ns.filter(compare_ns.test_ns.isNull()).show(20, False)

+-------+-------+----------------------+---------------+------------+---------+---------------+-------------+---------------+---------------+--------+-------+--------------+----------+--------+--------------+--------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+-------+--------+---------------+------------+---------+---------------+-------------+---------------+-------+-------+-------+--------------+----------+--------+--------------+------------+-------+
|prod_c1|prod_c2|prod_uid              |prod_pattern_id|prod_time_id|prod_hits|prod_start_ss2k|prod_duration|prod_session_id|prod_ip        |prod_os |prod_bt|prod_dist_ss2k|prod_pages|prod_iab|prod_akamai_id|prod_ua_hash  

In [195]:
prod_test.filter(prod_test.test_ns.isNull()).show(20, False)

+-------+-------+--------+---------------+------------+---------+---------------+-------------+---------------+-------+-------+-------+--------------+----------+--------+--------------+------------+-------+-------+-------+--------+---------------+------------+---------+---------------+-------------+---------------+-------+-------+-------+--------------+----------+--------+--------------+------------+-------+
|prod_c1|prod_c2|prod_uid|prod_pattern_id|prod_time_id|prod_hits|prod_start_ss2k|prod_duration|prod_session_id|prod_ip|prod_os|prod_bt|prod_dist_ss2k|prod_pages|prod_iab|prod_akamai_id|prod_ua_hash|prod_ns|test_c1|test_c2|test_uid|test_pattern_id|test_time_id|test_hits|test_start_ss2k|test_duration|test_session_id|test_ip|test_os|test_bt|test_dist_ss2k|test_pages|test_iab|test_akamai_id|test_ua_hash|test_ns|
+-------+-------+--------+---------------+------------+---------+---------------+-------------+---------------+-------+-------+-------+--------------+----------+--------+------

In [198]:
compare_ns.filter(compare_ns.test_ns.contains('ns_ak_d_sha2')).count()

95379

In [221]:
test_only.coalesce(1).write.format("csv").options(delimiter=',').mode("overwrite") \
    .save("snapchat_ccpa_test")

In [225]:
test_only.repartition(1).write.csv('snapchat_ccpa_test')

In [226]:
prod_only.repartition(1).write.csv('snapchat_ccpa_prod')

In [219]:
prod_only = compare.where(
    reduce(lambda x, y: x & y, (col(x).isNull() for x in test_tmp.columns)))
prod_only.coalesce(1).write.format("csv").mode('overwrite').options(delimiter=',',header = 'true').save('snap_ccpa/prod_only')

In [211]:
test_only.show(20)

+-------+-------+--------+---------------+------------+---------+---------------+-------------+---------------+-------+-------+-------+--------------+----------+--------+--------------+------------+-------+-------+----------------+--------------------+---------------+------------+---------+---------------+-------------+---------------+-------+--------+-------+--------------+----------+--------+--------------+------------+--------------------+
|prod_c1|prod_c2|prod_uid|prod_pattern_id|prod_time_id|prod_hits|prod_start_ss2k|prod_duration|prod_session_id|prod_ip|prod_os|prod_bt|prod_dist_ss2k|prod_pages|prod_iab|prod_akamai_id|prod_ua_hash|prod_ns|test_c1|         test_c2|            test_uid|test_pattern_id|test_time_id|test_hits|test_start_ss2k|test_duration|test_session_id|test_ip| test_os|test_bt|test_dist_ss2k|test_pages|test_iab|test_akamai_id|test_ua_hash|             test_ns|
+-------+-------+--------+---------------+------------+---------+---------------+-------------+-----------

In [5]:
test_bs.filter(test_bs.ns.contains('&ns_ak_d_sha2=&')).count()

315856646

In [10]:
ca_traffic = test_bs.filter(test_bs.ns.contains('&ns_ak_d_sha2=&'))

In [11]:
ca_traffic.coalesce(1).write.format("csv").options(delimiter=',', header='true').mode("overwrite") \
    .save("snapchat_ccpa_ca_traffic")

In [13]:
snap_ca = snap.filter('snap_created_id==1')
snap_ca.repartition(1).write.format("csv").options(delimiter=',', header='true').mode("overwrite") \
    .save("snap_ca_traffic")

In [7]:
315856646/58106806506.

0.005435794272524428

In [8]:
test_bs.filter(test_bs.ns.contains('ns_ak_d_sha2')).count()

1123638007

In [10]:
315856646/1123638007.

0.2811017819193437