# **GHTorrent Data Analytics with PySpark RDD: An unstructured case study**



##### source 1: https://ghtorrent.org
##### source 2: https://ghtorrent.org/downloads.html


In [3]:
########## ONLY in Colab ##########
!pip3 install pySpark


########## ONLY in Colab ##########

Collecting pySpark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pySpark
  Building wheel for pySpark (setup.py) ... [?25l[?25hdone
  Created wheel for pySpark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=c11c6c3393322a7c6b685b75ad333fadb840c16dcce892c55be6a71cb3e5e682
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pySpark
Installing collected packages: pySpark
Successfully installed pySpark-3.5.1


In [None]:
########## ONLY in Ubuntu Machine ##########
# Load Spark engine
!pip3 install -q findspark
import findspark
findspark.init()
########## ONLY in Ubuntu Machine ##########

In [15]:
from pyspark import SparkContext, SparkConf

# Initializing Spark
conf = SparkConf().setAppName("GHTorrent_Pyspark").setMaster('local[*]') #num of cores *
sc = SparkContext(conf=conf)


In [17]:
print(sc)

<SparkContext master=local[*] appName=GHTorrent>


In [18]:
########## ONLY in Colab ##########
from google.colab import drive
drive.mount('/content/drive')
########## ONLY in Colab ##########

Mounted at /content/drive


In [19]:
# Read and Load Data to Spark

rdd = sc.textFile("/content/drive/MyDrive/Colab Notebooks/ghtorrent-logs.txt.gz")

In [27]:
# Repartition and Cache Data:
rdd = rdd.repartition(10)


print(sc.defaultParallelism)
print(rdd.getNumPartitions())


rdd.persist(StorageLevel.MEMORY_AND_DISK)

2
10


MapPartitionsRDD[31] at coalesce at NativeMethodAccessorImpl.java:0

## Question 1: Count the number of records and get twenty records randomly.


In [29]:
rdd.count()

9669788

In [69]:
#twenty records

rdd.takeSample(False, 20)

['DEBUG, 2017-03-23T09:49:36+00:00, ghtorrent-20 -- api_client.rb: Sleeping for 639 seconds',
 'INFO, 2017-03-23T12:48:49+00:00, ghtorrent-38 -- retriever.rb: Added issue_event SemsProject/MOST 4->1008591938',
 'DEBUG, 2017-03-23T10:10:22+00:00, ghtorrent-41 -- ghtorrent.rb: Repo androidfanatic/docker-asterisk exists',
 'DEBUG, 2017-03-23T12:04:38+00:00, ghtorrent-37 -- ght_data_retrieval.rb: Processing event: PushEvent-5531005223',
 'INFO, 2017-03-23T10:00:26+00:00, ghtorrent-35 -- api_client.rb: Successful request. URL: https://api.github.com/repos/Raphael-Herdlicka/PhysicsSandbox/issues/3/labels?per_page=100, Remaining: 4957, Total: 77 ms',
 'DEBUG, 2017-03-23T11:02:59+00:00, ghtorrent-8 -- retriever.rb: issues cbeust/testng-eclipse -> 26 exists',
 'DEBUG, 2017-03-24T12:52:16+00:00, ghtorrent-49 -- ghtorrent.rb: Transaction committed (62 ms)',
 'DEBUG, 2017-03-23T11:15:22+00:00, ghtorrent-33 -- ghtorrent.rb: User heyilin416 exists',
 'DEBUG, 2017-03-23T10:07:40+00:00, ghtorrent-32 -

# **GHTorrent data format**
Every line of this log file includes:
1.   Logging level, one of `DEBUG`, `INFO`, `WARN`, `ERROR`
2.   A timestamp
3.   The downloader id
4.   The logging stage including at least one of the following names:
    *   `event_processing`
    *   `ght_data_retrieval`
    *   `api_client`
    *   `retriever`
    *   `ghtorrent`

## Question 2: Get the number of lines with both `Transaction` or `Repo` information.

In [65]:
#
import re
def filtered_rdd(line):
  return re.compile('\w+').findall(line.lower())

filtered_rdd('Salah ?Eddine!, is CurrEntly learning! BIG dAtA;, Analysis%')


['salah', 'eddine', 'is', 'currently', 'learning', 'big', 'data', 'analysis']

In [64]:
rdd_transaction = rdd.filter(lambda line: "transaction" in filtered_rdd(line))
rdd_repo = rdd.filter(lambda line: "repo" in filtered_rdd(line))

new_rdd = rdd_transaction.intersection(rdd_repo)

new_rdd.count()

19

In [67]:
new_rdd.collect()

['DEBUG, 2017-03-23T13:03:33+00:00, ghtorrent-42 -- ghtorrent.rb: Repo jwpttcg66/redis-game-transaction exists',
 'DEBUG, 2017-03-23T09:13:26+00:00, ghtorrent-9 -- ghtorrent.rb: Association of commit 5793a7df39a26b46082cafb59f287e2dc2cf9796 with repo xuminwlt/tcc-transaction exists',
 'DEBUG, 2017-03-23T09:13:17+00:00, ghtorrent-9 -- retriever.rb: Repo xuminwlt -> tcc-transaction exists',
 'DEBUG, 2017-03-23T09:13:16+00:00, ghtorrent-9 -- ghtorrent.rb: Repo changmingxie/tcc-transaction exists',
 'INFO, 2017-03-23T09:13:17+00:00, ghtorrent-9 -- ghtorrent.rb: Added repo xuminwlt/tcc-transaction',
 'DEBUG, 2017-03-23T09:13:17+00:00, ghtorrent-9 -- retriever.rb: Repo changmingxie -> tcc-transaction exists',
 'DEBUG, 2017-03-23T09:13:17+00:00, ghtorrent-9 -- ghtorrent.rb: Repo changmingxie/tcc-transaction exists',
 'DEBUG, 2017-03-23T09:13:27+00:00, ghtorrent-9 -- ghtorrent.rb: Repo xuminwlt/tcc-transaction exists',
 'DEBUG, 2017-03-23T11:09:37+00:00, ghtorrent-1 -- ghtorrent.rb: Repo pilky

## Question 3: Get the number of lines including `web link` for `WARN` logging levels.

In [92]:
def find_links(line):
  return re.findall(r'http[s]?://', line)

find_links("https://")

['https://']

In [None]:
rdd.filter(lambda line: line.split(',')[0] == 'WARN') \
              .filter(lambda line: len(find_links(line)) > 1 ) \
              .collect()


## Question 4: What is the most active `downloader id` for `Failed` connections?

In [115]:
rdd_Failed = rdd.filter(lambda line: 'failed' in filtered_rdd(line))

rdd_active_ids = rdd_Failed.map(lambda line: (line.replace(': ', ',').split(',')[2].split('-')[1],1))

rdd_active_ids.reduceByKey(lambda a,b: a+b).sortBy(lambda x: x[1], ascending=False).first()



('13 ', 79654)

## Question 5: What is the most active `repository`?

In [122]:
def get_repo(line):
    return re.compile(' \w+ ').findall(line.lower())


rdd.filter(lambda line: " repo " in get_repo(line)) \
   .map(lambda line: line.lower().split('repo')[1].split(' ')[1]) \
   .map(lambda repo: (repo, 1)) \
   .reduceByKey(lambda a,b: a+b) \
   .sortBy(lambda x: x[1], ascending=False) \
   .first()


('ovyx/hammerheadn', 22447)