I want to update the Wikipedia Preview ETL job to exclude bot traffic and improve our device classification ([T332960](https://phabricator.wikimedia.org/T332960)).

In [2]:
import pandas as pd
import wmfdata as wmf
from wmfdata.utils import pd_display_all

First, I get the existing data output for a recent day. I'll make sure the updated job accurately replicates the it.

In [9]:
test_query = """
    SELECT
        year,
        month,
        day,
        device_type,
        referer_host IN ('bjmoreshet.org', 'richardbevan.co.uk') AS site_had_bots,
        SUM(previews) AS previews,
        SUM(pageviews) AS pageviews
    FROM {table}
    WHERE
        year = 2023
        AND month = 4
        AND day = 15
    GROUP BY
        year,
        month,
        day,
        device_type,
        referer_host IN ('bjmoreshet.org', 'richardbevan.co.uk')
"""

old = wmf.presto.run(test_query.format(table="wmf_product.wikipediapreview_stats"))

I drop the existing test table.

In [8]:
wmf.hive.run("DROP TABLE nshahquinn.wikipediapreview_stats_test")



And run the updated job for the same day in a non-Jupyter shell using `deploy-oozie-job wikipediapreview_stats --test`.

Next, I get the new output from the test table and compare to the old.

In [18]:
new = wmf.presto.run(test_query.format(table="nshahquinn.wikipediapreview_stats_test"))

In [25]:
old.sort_values(["site_had_bots", "device_type"])

Unnamed: 0,year,month,day,device_type,site_had_bots,previews,pageviews
5,2023,4,15,non-touch,False,906,31
2,2023,4,15,touch,False,108,48
4,2023,4,15,non-touch,True,861,10
3,2023,4,15,touch,True,3,1
0,2023,4,15,non-touch,,3,4
1,2023,4,15,touch,,0,6


The new output looks right, matching except for:
1. a big loss of non-touch previews on sites which had a lot of bot traffic on our previous check
2. some of the "touch" traffic being recategorized as "touchscreen computer"
3. a very small loss of non-touch previews on non-bot-heavy sites and among the traffic without a referrer.

In [24]:
new.sort_values(["site_had_bots", "device_type"])

Unnamed: 0,year,month,day,device_type,site_had_bots,previews,pageviews
6,2023,4,15,non-touch,False,878,31
7,2023,4,15,touch,False,95,41
1,2023,4,15,touchscreen computer,False,13,7
2,2023,4,15,non-touch,True,191,10
5,2023,4,15,touch,True,3,1
0,2023,4,15,non-touch,,3,4
3,2023,4,15,touch,,0,3
4,2023,4,15,touchscreen computer,,0,2


Now, to deploy the new job and backfill using the update. We have source data going back 90 days, so we'll backfill for the past 85 days, starting on 2023-01-23.

In [16]:
pd.Timestamp.today().date() - pd.Timedelta(days=85)

datetime.date(2023, 1, 23)

I kill the existing Oozie job using `oozie job -kill 0149075-220913162928808-oozie-oozi-C`.

Next, I backup the existing dataset.

In [27]:
wmf.spark.run("DROP TABLE nshahquinn.wikipediapreview_stats_backup")

In [28]:
wmf.spark.run([
    """
    CREATE TABLE nshahquinn.wikipediapreview_stats_backup
    LIKE wmf_product.wikipediapreview_stats
    """,
    """
    INSERT INTO nshahquinn.wikipediapreview_stats_backup
    SELECT *
    FROM wmf_product.wikipediapreview_stats
    """
])    

23/04/18 02:53:17 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
                                                                                

Now, I replace the existing dataset with a truncated version.

In [29]:
wmf.hive.run("DROP TABLE wmf_product.wikipediapreview_stats")

wmf.hive.run("""
CREATE EXTERNAL TABLE wmf_product.wikipediapreview_stats (
    `pageviews`      bigint  COMMENT 'Number of pageviews shown as a result of a clickthrough from a Wikipedia Preview preview',
    `previews`       bigint  COMMENT 'Number of API requests for article preview content made by Wikipedia Preview clients',
    `year`           int     COMMENT 'Unpadded year of request',
    `month`          int     COMMENT 'Unpadded month of request',
    `day`            int     COMMENT 'Unpadded day of request',
    `device_type`    string  COMMENT 'Type of device used by the client: touch or non-touch',
    `referer_host`   string  COMMENT 'Host from referer parsing',
    `continent`      string  COMMENT 'Continent of the accessing agents (maxmind GeoIP database)',
    `country_code`   string  COMMENT 'Country iso code of the accessing agents (maxmind GeoIP database)',
    `country`        string  COMMENT 'Country (text) of the accessing agents (maxmind GeoIP database)',
    `instrumentation_version` int COMMENT 'Version number incremented along with major instrumentation changes'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION 'hdfs://analytics-hadoop//user/analytics-product/wikipediapreview_stats/daily'
""")

wmf.hive.run([
    "SET hive.exec.compress.output=true",
    "SET mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec",
    """
    INSERT INTO wmf_product.wikipediapreview_stats
    SELECT *
    FROM nshahquinn.wikipediapreview_stats_backup
    WHERE
        year < 2023 
        OR (year = 2023 AND month = 1 and day < 23)
    -- The ORDER BY forces a single output file 
    ORDER BY
        year,
        month,
        day
    LIMIT 10000000
    """
])



Checking that I loaded what I wanted:

In [31]:
daily_counts = wmf.hive.run("""
    SELECT
        year,
        month,
        day,
        SUM(previews) AS previews,
        SUM(pageviews) AS pageviews
    FROM wmf_product.wikipediapreview_stats
    GROUP BY
        year,
        month,
        day
    ORDER BY 
        year,
        month,
        day
    LIMIT 10000
""")



In [32]:
daily_counts.head()

Unnamed: 0,year,month,day,previews,pageviews
0,2020,10,20,136,2
1,2020,10,21,44,4
2,2020,10,22,18,0
3,2020,10,23,72,0
4,2020,10,26,186,2


In [33]:
daily_counts.tail()

Unnamed: 0,year,month,day,previews,pageviews
895,2023,4,13,1203,189
896,2023,4,14,1259,110
897,2023,4,15,1881,100
898,2023,4,16,1270,109
899,2023,4,17,1837,110


23/04/18 03:07:52 WARN UserGroupInformation: Not attempting to re-login since the last re-login was attempted less than 60 seconds before. Last Login=1681787236814


Um, no, I didn't get the right data...and now I remember, that, since this is an external table for Hive, dropping the table didn't delete the underlying data files in HDFS. I'm pretty sure I had to make this same mistake and find out during every previous update of this ETL job too...

In [34]:
!hdfs dfs -ls /user/analytics-product/wikipediapreview_stats/daily

Found 2 items
-rwxrwxr-x   3 neilpquinn-wmf    analytics-privatedata-users     504405 2023-04-18 02:59 /user/analytics-product/wikipediapreview_stats/daily/000000_0.gz
-rw-r--r--   3 analytics-product hdfs                            650266 2023-04-18 00:44 /user/analytics-product/wikipediapreview_stats/daily/data.gz


In [37]:
!hdfs dfs -rm /user/analytics-product/wikipediapreview_stats/daily/data.gz

23/04/18 03:09:47 INFO fs.TrashPolicyDefault: Moved: 'hdfs://analytics-hadoop/user/analytics-product/wikipediapreview_stats/daily/data.gz' to trash at: hdfs://analytics-hadoop/user/neilpquinn-wmf/.Trash/Current/user/analytics-product/wikipediapreview_stats/daily/data.gz


In [38]:
!hdfs dfs -rm /user/analytics-product/wikipediapreview_stats/daily/000000_0.gz

23/04/18 03:10:14 INFO fs.TrashPolicyDefault: Moved: 'hdfs://analytics-hadoop/user/analytics-product/wikipediapreview_stats/daily/000000_0.gz' to trash at: hdfs://analytics-hadoop/user/neilpquinn-wmf/.Trash/Current/user/analytics-product/wikipediapreview_stats/daily/000000_0.gz


Now, to repeat the process:

In [41]:
wmf.hive.run("DROP TABLE wmf_product.wikipediapreview_stats")

wmf.hive.run("""
CREATE EXTERNAL TABLE wmf_product.wikipediapreview_stats (
    `pageviews`      bigint  COMMENT 'Number of pageviews shown as a result of a clickthrough from a Wikipedia Preview preview',
    `previews`       bigint  COMMENT 'Number of API requests for article preview content made by Wikipedia Preview clients',
    `year`           int     COMMENT 'Unpadded year of request',
    `month`          int     COMMENT 'Unpadded month of request',
    `day`            int     COMMENT 'Unpadded day of request',
    `device_type`    string  COMMENT 'Type of device used by the client: touch or non-touch',
    `referer_host`   string  COMMENT 'Host from referer parsing',
    `continent`      string  COMMENT 'Continent of the accessing agents (maxmind GeoIP database)',
    `country_code`   string  COMMENT 'Country iso code of the accessing agents (maxmind GeoIP database)',
    `country`        string  COMMENT 'Country (text) of the accessing agents (maxmind GeoIP database)',
    `instrumentation_version` int COMMENT 'Version number incremented along with major instrumentation changes'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION 'hdfs://analytics-hadoop//user/analytics-product/wikipediapreview_stats/daily'
""")

wmf.hive.run([
    "SET hive.exec.compress.output=true",
    "SET mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec",
    """
    INSERT INTO wmf_product.wikipediapreview_stats
    SELECT *
    FROM nshahquinn.wikipediapreview_stats_backup
    WHERE
        year < 2023 
        OR (year = 2023 AND month = 1 and day < 23)
    -- The ORDER BY forces a single output file 
    ORDER BY
        year,
        month,
        day
    LIMIT 10000000
    """
])



And checking again:

In [43]:
daily_counts = wmf.hive.run("""
    SELECT
        year,
        month,
        day,
        SUM(previews) AS previews,
        SUM(pageviews) AS pageviews
    FROM wmf_product.wikipediapreview_stats
    GROUP BY
        year,
        month,
        day
    ORDER BY 
        year,
        month,
        day
    LIMIT 10000
""")



In [44]:
daily_counts.head()

Unnamed: 0,year,month,day,previews,pageviews
0,2020,10,20,68,1
1,2020,10,21,22,2
2,2020,10,22,9,0
3,2020,10,23,36,0
4,2020,10,26,93,1


In [45]:
daily_counts.tail()

Unnamed: 0,year,month,day,previews,pageviews
810,2023,1,18,4229,134
811,2023,1,19,6928,221
812,2023,1,20,3875,140
813,2023,1,21,2211,121
814,2023,1,22,1820,70


23/04/18 13:43:12 WARN Client: Failed to cleanup staging dir hdfs://analytics-hadoop/user/neilpquinn-wmf/.sparkStaging/application_1678266962370_232108
org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot delete /user/neilpquinn-wmf/.sparkStaging/application_1678266962370_232108. Name node is in safe mode.
It was turned on manually. Use "hdfs dfsadmin -safemode leave" to turn safe mode off. NamenodeHostName:an-master1001.eqiad.wmnet
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.newSafemodeException(FSNamesystem.java:1436)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkNameNodeSafeMode(FSNamesystem.java:1423)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:2867)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.delete(NameNodeRpcServer.java:1110)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.delete(ClientNamenodeProtocolServerSideTranslatorPB.java:648)
	at org.apache.hado

Much better! (The Hadoop error is due to unrelated maintenance on the cluster). Now, I deploy the updated job:

```
neilpquinn-wmf@stat1005:~/product_analytics_jobs$ ./deploy-oozie-job wikipediapreview_stats --production
The HDFS job directory will be hdfs:///user/analytics-product/jobs/wikipediapreview_stats
Removing old job files in the job directory...
Creating the job directory...
Putting new job files into the job directory...
Submitting the job...
job: 0174537-220913162928808-oozie-oozi-C
```

Well, after letting the job run overnight, I have two problems:
1. after running for correctly for the first 60 days, the task for 2023-03-24 failed.
2. the data from before the backfill now looks wildly inflated. I think something is causing the old data to get duplicated with every run of the job.

Ah! I deleted the old data files, but I forgot that I have to rename the loaded data to `data.gz`, or we get the data duplication.

In [47]:
!hdfs dfs -ls /user/analytics-product/wikipediapreview_stats/daily

Found 2 items
-rwxrwxr-x   3 neilpquinn-wmf    analytics-privatedata-users     504405 2023-04-18 03:12 /user/analytics-product/wikipediapreview_stats/daily/000000_0.gz
-rw-r--r--   3 analytics-product hdfs                          30356281 2023-04-18 11:38 /user/analytics-product/wikipediapreview_stats/daily/data.gz


I'm pretty sure `000000_0.gz` is the data I loaded in the first place, and it gets re-incorporated into `data.gz` every time the data runs. So I can just delete `data.gz`, rename `000000_0.gz` to `data.gz`, and then re-run the backfill.

In [48]:
!hdfs dfs -rm /user/analytics-product/wikipediapreview_stats/daily/data.gz

23/04/18 21:34:58 INFO fs.TrashPolicyDefault: Moved: 'hdfs://analytics-hadoop/user/analytics-product/wikipediapreview_stats/daily/data.gz' to trash at: hdfs://analytics-hadoop/user/neilpquinn-wmf/.Trash/Current/user/analytics-product/wikipediapreview_stats/daily/data.gz1681853698079


In [49]:
!hdfs dfs -mv /user/analytics-product/wikipediapreview_stats/daily/000000_0.gz /user/analytics-product/wikipediapreview_stats/daily/data.gz

In [50]:
!hdfs dfs -ls /user/analytics-product/wikipediapreview_stats/daily

Found 1 items
-rwxrwxr-x   3 neilpquinn-wmf analytics-privatedata-users     504405 2023-04-18 03:12 /user/analytics-product/wikipediapreview_stats/daily/data.gz


In [54]:
wmf.spark.create_session()

23/04/18 21:41:15 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
23/04/18 21:41:15 WARN Utils: Service 'sparkDriver' could not bind on port 12000. Attempting port 12001.
23/04/18 21:41:15 WARN Utils: Service 'sparkDriver' could not bind on port 12001. Attempting port 12002.
23/04/18 21:41:15 WARN Utils: Service 'sparkDriver' could not bind on port 12002. Attempting port 12003.
23/04/18 21:41:15 WARN Utils: Service 'sparkDriver' could not bind on port 12003. Attempting port 12004.
23/04/18 21:41:15 WARN Utils: Service 'sparkDriver' could not bind on port 12004. Attempting port 12005.
23/04/18 21:41:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/04/18 21:41:15 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/04/18 21:41:15 WARN Utils: Service 'SparkUI' could not bind on port 4042. Att

The data in the table looks correct.

In [55]:
daily_counts = wmf.spark.run("""
    SELECT
        year,
        month,
        day,
        SUM(previews) AS previews,
        SUM(pageviews) AS pageviews
    FROM wmf_product.wikipediapreview_stats
    GROUP BY
        year,
        month,
        day
""")

                                                                                

In [58]:
daily_counts = daily_counts.sort_values(["year", "month", "day"])
daily_counts.head()

Unnamed: 0,year,month,day,previews,pageviews
578,2020,10,20,68,1
732,2020,10,21,22,2
247,2020,10,22,9,0
546,2020,10,23,36,0
801,2020,10,26,93,1


In [59]:
daily_counts.tail()

Unnamed: 0,year,month,day,previews,pageviews
776,2023,1,18,4229,134
103,2023,1,19,6928,221
64,2023,1,20,3875,140
352,2023,1,21,2211,121
690,2023,1,22,1820,70


I kill the Oozie job using `oozie job -kill 0174537-220913162928808-oozie-oozi-C` and deploy the updated job:
```
neilpquinn-wmf@stat1005:~/product_analytics_jobs$ ./deploy-oozie-job wikipediapreview_stats --production
The HDFS job directory will be hdfs:///user/analytics-product/jobs/wikipediapreview_stats
Removing old job files in the job directory...
Creating the job directory...
Putting new job files into the job directory...
Submitting the job...
job: 0174858-220913162928808-oozie-oozi-C
```

The first two daily tasks have succeeded, and everything looks good!

In [63]:
!hdfs dfs -ls /user/analytics-product/wikipediapreview_stats/daily

Found 1 items
-rw-r--r--   3 analytics-product hdfs     509120 2023-04-18 21:55 /user/analytics-product/wikipediapreview_stats/daily/data.gz


In [64]:
wmf.spark.run("""
    SELECT
        year,
        month,
        day,
        SUM(previews) AS previews,
        SUM(pageviews) AS pageviews
    FROM wmf_product.wikipediapreview_stats
    WHERE
        year = 2023
        AND month = 1
        AND day >= 18
    GROUP BY
        year,
        month,
        day
""").sort_values(["year", "month", "day"])

                                                                                

Unnamed: 0,year,month,day,previews,pageviews
6,2023,1,18,4229,134
1,2023,1,19,6928,221
0,2023,1,20,3875,140
2,2023,1,21,2211,121
5,2023,1,22,1820,70
3,2023,1,23,1785,92
4,2023,1,24,1563,76
