### Update orphaning

In [None]:
import datetime as dt
import urllib2
import ujson as json

%pylab inline

Get the time when this job was started (for debugging purposes).

In [None]:
starttime = dt.datetime.now()
starttime

Declare the channel to look at.

In [None]:
channelToProcess = "release"

In [None]:
sc.defaultParallelism

In [None]:
today = dt.date.today()

# Find the date of last Wednesday to get the proper 7 day range, and to find the last longitudinal dataset correctly.
lastWednesday = today
currentWeekday = today.weekday()
if (currentWeekday < 2):
    lastWednesday -= (dt.timedelta(days=5) + dt.timedelta(days=currentWeekday))
if (currentWeekday > 2):
    lastWednesday -= (dt.timedelta(days=currentWeekday) - dt.timedelta(days=2))

minRange = lastWednesday - dt.timedelta(days=17)
reportDateStr = lastWednesday.strftime("%Y%m%d")
minRangeStr = minRange.strftime("%Y%m%d")
list([lastWednesday, minRangeStr, reportDateStr])

The longitudinal dataset can be accessed as a Spark [DataFrame](https://spark.apache.org/docs/1.5.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame), which is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python.

We need to ensure that we're working with a longitudinal dataset that has been generated at least 15 days after our minimum date range to allow sufficient time for users to submit their telemetry data.

In [None]:
tablesList = sqlContext.sql("show tables").select("tableName").collect()
lastTableName = tablesList[-1][0]
lastTableDateStr = lastTableName[-8:]
minTableDate = minRange + dt.timedelta(days=15)
minTableDateStr = minTableDate.strftime("%Y%m%d")
assert (minTableDateStr <= lastTableDateStr), "The most recent longitudinal dataset wasn't created recently enough for this job to run. lastTableDateStr: " + lastTableDateStr + ", minTableDateStr: " + minTableDateStr
sqlStr = "SELECT * FROM " + lastTableName
frame = sqlContext.sql(sqlStr)
sqlStr

Restrict the dataframe to the desired channel.

In [None]:
channelSubset = frame.filter(frame.normalized_channel == channelToProcess)

Restrict the dataframe to the desired data.

In [None]:
dataSubset = channelSubset.select("subsession_start_date",
                                  "subsession_length",
                                  "update_check_code_notify",
                                  "update_check_no_update_notify",
                                  "build.version",
                                  "settings.update.enabled")

Restrict the data to the proper 7 day range, starting at least 17 days before the creation date of the
longitudinal dataset.

In [None]:
def start_date_filter(d):
    try:
        date = dt.datetime.strptime(d.subsession_start_date[0][:10], "%Y-%m-%d").date()
        return minRange <= date
    except ValueError:
        return False
    except TypeError:
        return False

dateFiltered = dataSubset.rdd.filter(start_date_filter).cache()

%time dateFiltered.count()

Analyze the data to determine the number of users on a current version of Firefox vs. a version that's out of date. A "user on a current version" is defined as being either on the version found in the version.txt file on hg.mozilla.org, or the two versions just prior to it. Versions prior to FF 42 are ignored since unified telemetry was not turned on by default on earlier versions.

In [None]:
latestVersion = urllib2.urlopen("http://hg.mozilla.org/releases/mozilla-" + channelToProcess + "/raw-file/tip/browser/config/version.txt").read()
latestVersion = int(latestVersion.split(".")[0])

def status_mapper(d):
    try:
        if d.version[0] is None:
            return ("none-version", d)
        currVersion = int(d.version[0].split(".")[0])
        if currVersion < 42:
            return ("ignore-version-too-low", d)
        if currVersion < latestVersion - 2:
            # Check if the user ran a particular orphaned version of Firefox for at least 2 hours
            seconds = 0
            curr_version = d.version[0]
            index = 0
            twelve_weeks_ago = lastWednesday - dt.timedelta(weeks=12)
            while seconds < 7200 and index < len(d.version) and d.version[index] == curr_version:
                try:
                    date = dt.datetime.strptime(d.subsession_start_date[index][:10], "%Y-%m-%d").date()
                    if date < twelve_weeks_ago:
                        return ("out-of-date-not-run-long-enough", d)
                    seconds += d.subsession_length[index]
                    index += 1
                except ValueError:
                    index += 1
                except TypeError:
                    index += 1
            if seconds >= 7200:
                return ("out-of-date", d)
            return ("out-of-date-not-run-long-enough", d)
        return ("up-to-date", d)
    except ValueError:
        return ("value-error", d)
        
statuses = dateFiltered.map(status_mapper).cache()
up_to_date_results = statuses.countByKey()
up_to_date_json_results = json.dumps(up_to_date_results, ensure_ascii=False)
up_to_date_json_results

For people who are out-of-date, determine how many of them have updates disabled:

In [None]:
out_of_date_statuses = statuses.filter(lambda p: "out-of-date" in p)

def update_disabled_mapper(d):
    status, ping = d
    if ping is None or ping.enabled is None or ping.enabled[0] is None:
        return ("none-update-enabled", ping)
    if ping.enabled[0] == True:
        return ("update-enabled", ping)
    return ("update-disabled", ping)
    
update_enabled_disabled_statuses = out_of_date_statuses.map(update_disabled_mapper)
update_enabled_disabled_results = update_enabled_disabled_statuses.countByKey()
update_enabled_disabled_json_results = json.dumps(update_enabled_disabled_results, ensure_ascii=False)
update_enabled_disabled_json_results

Focus on orphaned users who have updates enabled.

In [None]:
update_enabled_statuses = update_enabled_disabled_statuses.filter(lambda p: "update-enabled" in p).cache()

For people who are out-of-date and have updates enabled, determine the distribution across Firefox versions.

In [None]:
def version_mapper(d):
    status, ping = d
    if ping.version[0] is None:
        return ("-1", ping)
    return (ping.version[0], ping)
    
orphaned_by_versions = update_enabled_statuses.map(version_mapper)
orphaned_by_versions_results = orphaned_by_versions.countByKey()
orphaned_by_versions_json_results = json.dumps(orphaned_by_versions_results, ensure_ascii=False)
orphaned_by_versions_json_results

For people who are out-of-date and have updates enabled, determine what the update check returns.

In [None]:
def update_check_code_notify_mapper(d):
    status, ping = d
    if ping is None or ping.update_check_code_notify is None:
        return -1
    for check_code in ping.update_check_code_notify:
        counter = -1
        for i in check_code:
            counter += 1
            if i != 0:
                return counter
    if ping.update_check_no_update_notify is not None and ping.update_check_no_update_notify[0] > 0:
        return 0;
    return -1

update_check_code_notify_statuses = update_enabled_statuses.map(update_check_code_notify_mapper)
update_check_code_notify_results = update_check_code_notify_statuses.countByValue()
update_check_code_notify_json_results = json.dumps(update_check_code_notify_results, ensure_ascii=False)
update_check_code_notify_json_results

Write results to JSON.

In [None]:
latest_version_object = {"latest-version": latestVersion}
up_to_date_object = {"up-to-date": up_to_date_results}
update_enabled_disabled_object = {"update-enabled-disabled": update_enabled_disabled_results}
update_check_code_notify_object = {"update-check-code-notify": update_check_code_notify_results}
orphaned_by_versions_object = {"orphaned-by-versions": orphaned_by_versions_results}

final_results = [up_to_date_object, update_enabled_disabled_object, update_check_code_notify_object, latest_version_object, orphaned_by_versions_object]
final_results_json =  json.dumps(final_results, ensure_ascii=False)
final_results_json

Finally, store the output in the local directory to be uploaded automatically once the job completes. The file will be stored at:

https://analysis-output.telemetry.mozilla.org/SPARKJOBNAME/data/FILENAME

In [None]:
filename = "./output/" + reportDateStr + ".json"

with open(filename, 'w') as f:
    f.write(final_results_json)

filename

Get the time when this job ended (for debugging purposes):

In [None]:
endtime = dt.datetime.now()
endtime

In [None]:
difference = endtime - starttime
difference