# Microsoft Free and Open Source Software Analysis

This notebook contains an analysis of the free and open source contributions of Microsoft.

### Initialize PySpark

In [1]:
import sys, os, json
from frozendict import frozendict

# If there is no SparkSession, create the environment
try:
    sc and spark
except NameError as e:
    import findspark
    
    findspark.init()
    import pyspark
    import pyspark.sql
    
    sc = pyspark.SparkContext()
    spark = pyspark.sql.SparkSession(sc).builder.appName("Extract Network").getOrCreate()

### Load Github Archive Events

The GH or Github Archive is available at [https://www.gharchive.org/](https://www.gharchive.org/).

In [2]:
# Load all Github events for the year 2017
github_lines = sc.textFile("../data/githubarchive.org/2017*.json.gz")

# Apply the function to every record
def parse_json(line):
    record = None
    try:
        record = json.loads(line)
    except json.JSONDecodeError as e:
        sys.stderr.write(str(e))
        record = {"error": "Parse error"}
    return record

github_events = github_lines.map(parse_json)
github_events = github_events.filter(lambda x: "error" not in x)

### Inspecting a Record

Note that some elements are shared among all record types.

In [3]:
github_events.first()

{'id': '5738195424',
 'type': 'PushEvent',
 'actor': {'id': 19595939,
  'login': 'jbrain4',
  'display_login': 'jbrain4',
  'gravatar_id': '',
  'url': 'https://api.github.com/users/jbrain4',
  'avatar_url': 'https://avatars.githubusercontent.com/u/19595939?'},
 'repo': {'id': 83996538,
  'name': 'jbrain4/jbrain4.github.io',
  'url': 'https://api.github.com/repos/jbrain4/jbrain4.github.io'},
 'payload': {'push_id': 1696040405,
  'size': 2,
  'distinct_size': 1,
  'ref': 'refs/heads/master',
  'head': 'b0499d4f2bf35217940d59249b05ae2ee4ab5593',
  'before': '75e863eea1baf2b386edd415ba109b5472df5a74',
  'commits': [{'sha': '7084e2b3e459e1d05c7bd83f9bc67838e45b3ada',
    'author': {'name': 'Jason Brain',
     'email': 'bf324232954dc2e3935a75d8ebd889c8cd04a48e@outlook.com'},
    'message': 'GOT IT!!!',
    'distinct': False,
    'url': 'https://api.github.com/repos/jbrain4/jbrain4.github.io/commits/7084e2b3e459e1d05c7bd83f9bc67838e45b3ada'},
   {'sha': 'b0499d4f2bf35217940d59249b05ae2ee4ab5

### Splitting Record Types

These events are of varying categories, so we will have to split them into types. Event types for the [v3 API](https://developer.github.com/v3/) are available at [https://developer.github.com/v3/activity/events/types/](https://developer.github.com/v3/activity/events/types/).

In [4]:
#
# Split our events out by type
#

event_count = github_events.count()
print('Github events to start: {:,}'.format( event_count ))

Github events to start: 412,943,059


In [5]:
# Drop down user/repo and filter nulls in these fields
def drop_user_repo(x):
    y = None
    if x is not None:
        y = x.update(
            {
                "user": x["actor"]["login"] if "actor" in x and "login" in x["actor"] else None,
                "repo": x["repo"]["name"] if "repo" in x and "name" in x["repo"] else None
            }
        )
    return y
user_repo_events = github_events.map( drop_user_repo ) 

In [6]:
# Filter empty repos and users
user_repo_events = user_repo_events.filter( 
    lambda x: x["user"] is not None and x["repo"] is not None
)

In [7]:
# Count records now
user_repo_event_count = user_repo_events.count()
print('Github events with user/repo: {:,}'.format( user_repo_event_count ))

print('---------------------------------')

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 1 times, most recent failure: Lost task 1.0 in stage 2.0 (TID 8762, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/rjurney/spark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/home/rjurney/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/rjurney/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/rjurney/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/rjurney/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/rjurney/spark/python/pyspark/rdd.py", line 346, in func
    return f(iterator)
  File "/home/rjurney/spark/python/pyspark/rdd.py", line 1041, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/home/rjurney/spark/python/pyspark/rdd.py", line 1041, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "<ipython-input-6-e6ec4e438e22>", line 3, in <lambda>
TypeError: 'NoneType' object is not subscriptable

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:467)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/rjurney/spark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/home/rjurney/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/rjurney/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/rjurney/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/rjurney/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/rjurney/spark/python/pyspark/rdd.py", line 346, in func
    return f(iterator)
  File "/home/rjurney/spark/python/pyspark/rdd.py", line 1041, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/home/rjurney/spark/python/pyspark/rdd.py", line 1041, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "<ipython-input-6-e6ec4e438e22>", line 3, in <lambda>
TypeError: 'NoneType' object is not subscriptable

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
# These will be split into one RDD type each in a dictionary. 
# See https://developer.github.com/v3/activity/events/types/
event_type_names = [
    "CheckRunEvent",
    "CheckSuiteEvent",
    "CommitCommentEvent",
    "ContentReferenceEvent",
    "CreateEvent",
    "DeleteEvent",
    "DeploymentEvent",
    "DeploymentStatusEvent",
    "DownloadEvent",
    "FollowEvent",
    "ForkEvent",
    "ForkApplyEvent",
    "GitHubAppAuthorizationEvent",
    "GistEvent",
    "GollumEvent",
    "InstallationEvent",
    "InstallationRepositoriesEvent",
    "IssueCommentEvent",
    "IssuesEvent",
    "LabelEvent",
    "MarketplacePurchaseEvent",
    "MemberEvent",
    "MembershipEvent",
    "MilestoneEvent",
    "OrganizationEvent",
    "OrgBlockEvent",
    "PageBuildEvent",
    "ProjectCardEvent",
    "ProjectColumnEvent",
    "ProjectEvent",
    "PublicEvent",
    "PullRequestEvent",
    "PullRequestReviewEvent",
    "PullRequestReviewCommentEvent",
    "PushEvent",
    "ReleaseEvent",
    "RepositoryEvent",
    "RepositoryImportEvent",
    "RepositoryVulnerabilityAlertEvent",
    "SecurityAdvisoryEvent",
    "StatusEvent",
    "TeamEvent",
    "TeamAddEvent",
    "WatchEvent"
]

event_types = {}
for event_type_name in event_type_names:
    events_of_one_type = github_events.filter( lambda x[0]: "type" in x and x["type"] == x[1] )
    event_types[event_type_name] = events_of_one_type.filter( split_events )

    event_count = event_types[event_type_name].count()
    print('{} type count: {}'.format(event_type_name, event_count))

print('Bingo!')