This dataset, which is hosted by DataBricks, is fully described at this link. https://meta.wikimedia.org/wiki/Research:Wikipedia_clickstream

"The data contains counts of (referer, resource) pairs extracted from the request logs of English Wikipedia. When a client requests a resource by following a link or performing a search, the URI of the webpage that linked to the resource is included with the request in an HTTP header called the "referer". This data captures 22 million (referer, resource) pairs from a total of 3.2 billion requests collected during the month of February 2015."

Let's start by just taking a look at the file.

In [3]:
%fs ls /databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed

path,name,size
dbfs:/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed/2015_2_clickstream.tsv,2015_2_clickstream.tsv,1322171548


We can see that the data is saved as a TSV that's over 1 GB. Let's take a look at the first chunk of data.

In [5]:
%fs head /databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed/2015_2_clickstream.tsv

Let's load the data into a dataframe. sqlContext is our entry point for using structured data with Spark. The file contains a header, as we saw above, and is tab seperated. We'll set infer_schema to "true" and let Spark try to determine the best datatypes.

In [7]:
df = sqlContext.read.format('csv')\
      .options(header='true', delimiter='\t',infer_schema='true')\
        .load("dbfs:///databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed")

Let's take a look at our data now.

In [9]:
display(df.take(10))

prev_id,curr_id,n,prev_title,curr_title,type
,3632887,121,other-google,!!,other
,3632887,93,other-wikipedia,!!,other
,3632887,46,other-empty,!!,other
,3632887,10,other-other,!!,other
64486.0,3632887,11,!_(disambiguation),!!,other
2061699.0,2556962,19,Louden_Up_Now,!!!_(album),link
,2556962,25,other-empty,!!!_(album),other
,2556962,16,other-google,!!!_(album),other
,2556962,44,other-wikipedia,!!!_(album),other
64486.0,2556962,15,!_(disambiguation),!!!_(album),link


Let's check the schema to if we have the correct data types.

In [11]:
df.printSchema()

We'd prefer the number of hits to be an integer and not a string, so let's fix that.

In [13]:
df = df.withColumn("n", df["n"].cast("integer"))

In [14]:
df.printSchema()

We're going to use this dataframe multiple times, so let's persist it in memory.

In [16]:
df.cache()

Let's check how many rows of data we have.

In [18]:
df.count()

Let's create a view then use a familiar SQL query to find the articles with the most hits in February 2015.

In [20]:
df.createOrReplaceTempView("wiki")

In [21]:
%sql

SELECT curr_title,
       sum(n) AS total_hits
  FROM wiki
WHERE curr_title != 'Main_Page'
GROUP BY curr_title
ORDER BY total_hits DESC
LIMIT 10;

curr_title,total_hits
87th_Academy_Awards,2559794
Fifty_Shades_of_Grey,2326175
Alive,2244781
Chris_Kyle,1709341
Fifty_Shades_of_Grey_(film),1683892
Deaths_in_2015,1614577
Birdman_(film),1545842
Islamic_State_of_Iraq_and_the_Levant,1406530
Stephen_Hawking,1384193
Academy_Awards,1354794


Let's import pyspark SQL functions and try to do the same query.

In [23]:
import pyspark.sql.functions as f

In [24]:
display(
df.filter(df.curr_title != "Main_Page")\
     .groupBy("curr_title")\
      .agg(f.sum(df.n).alias("total_hits"))\
        .orderBy(f.sum(df.n).desc())\
          .take(10)
)

curr_title,total_hits
87th_Academy_Awards,2559794
Fifty_Shades_of_Grey,2326175
Alive,2244781
Chris_Kyle,1709341
Fifty_Shades_of_Grey_(film),1683892
Deaths_in_2015,1614577
Birdman_(film),1545842
Islamic_State_of_Iraq_and_the_Levant,1406530
Stephen_Hawking,1384193
Academy_Awards,1354794


Let's take a look at the pages where people are coming from. It looks like most people are coming from Google.

In [26]:
%sql

SELECT prev_title AS refer,
       sum(n) AS total_hits
  FROM wiki
 GROUP BY prev_title
 ORDER BY total_hits DESC
 LIMIT 10;

refer,total_hits
other-google,1496209976
other-empty,347693595
other-wikipedia,129772279
other-other,77569671
other-bing,65962792
other-yahoo,48501171
Main_Page,29923502
other-twitter,19241298
other-facebook,2314026
87th_Academy_Awards,1680675


In [27]:
display(df.groupBy('prev_title').agg(f.sum(df.n).alias('total_hits'), df.prev_title.alias('refer')).orderBy(f.sum(df.n).desc()).take(10))

#Command took 6.48 seconds -- by zind0015@umn.edu at 11/6/2019, 2:46:16 PM on My Cluster

prev_title,total_hits,refer
other-google,1496209976,other-google
other-empty,347693595,other-empty
other-wikipedia,129772279,other-wikipedia
other-other,77569671,other-other
other-bing,65962792,other-bing
other-yahoo,48501171,other-yahoo
Main_Page,29923502,Main_Page
other-twitter,19241298,other-twitter
other-facebook,2314026,other-facebook
87th_Academy_Awards,1680675,87th_Academy_Awards


As we can see above, it looks like most people are visiting Wikipedia pages from pages outside of Wikipedia. Let's find what percent of traffic is coming from other Wikipedia pages.

In [29]:
%sql
SELECT ((SELECT SUM(n)
          FROM wiki
          WHERE prev_title NOT IN ("other-empty", "other-google", "other-yahoo",
                                   "other-bing", "other-facebook", "other-twitter",
                                    "other-other")) / (SELECT SUM(n) FROM wiki)) * 100 AS percent_internal
                          

percent_internal
37.33018624438221


In [30]:
external = ["other-empty", "other-google", "other-yahoo", "other-bing", "other-facebook", "other-twitter", "other-other"]

df.filter(f.col('prev_title').isin(external) == False).agg(f.sum(df.n)).collect()[0][0] / df.agg(f.sum(df.n)).collect()[0][0] * 100

Let's take a look at the most popular pages again and see how often a hit on that page leads to another Wikipedia page.

In [32]:
%sql
SELECT curr_title,
       in_count,
       out_count,
       out_count / in_count AS ratio
FROM (SELECT curr_title,
             SUM(n) AS in_count
        FROM wiki
    GROUP BY curr_title) AS T
JOIN (SELECT prev_title,
             SUM(n) AS out_count
        FROM wiki
    GROUP BY prev_title) AS T2 ON T.curr_title = T2.prev_title
ORDER BY in_count DESC;


curr_title,in_count,out_count,ratio
Main_Page,127500620,29923502,0.2346929920811365
87th_Academy_Awards,2559794,1680675,0.656566504960946
Fifty_Shades_of_Grey,2326175,1146401,0.4928266360011607
Alive,2244781,3480,0.0015502625868625
Chris_Kyle,1709341,869974,0.5089528654610168
Fifty_Shades_of_Grey_(film),1683892,1375687,0.816968665448853
Deaths_in_2015,1614577,1135121,0.7030454416234098
Birdman_(film),1545842,796141,0.5150209400443253
Islamic_State_of_Iraq_and_the_Levant,1406530,794907,0.5651546714254229
Stephen_Hawking,1384193,644088,0.4653166140848855


In [33]:
in_df = df.groupBy("curr_title").agg(f.sum(df.n).alias("in_count")).cache()
out_df = df.groupBy("prev_title").agg(f.sum(df.n).alias("out_count")).cache()

flow_df = in_df.join(out_df, in_df.curr_title == out_df.prev_title)\
            .withColumn('ratio', out_df.out_count/in_df.in_count)\
              .orderBy(in_df.in_count.desc())\
                .select("curr_title", "in_count", "out_count", "ratio")

display(flow_df.take(10))

curr_title,in_count,out_count,ratio
Main_Page,127500620,29923502,0.2346929920811365
87th_Academy_Awards,2559794,1680675,0.656566504960946
Fifty_Shades_of_Grey,2326175,1146401,0.4928266360011607
Alive,2244781,3480,0.0015502625868625
Chris_Kyle,1709341,869974,0.5089528654610168
Fifty_Shades_of_Grey_(film),1683892,1375687,0.816968665448853
Deaths_in_2015,1614577,1135121,0.7030454416234098
Birdman_(film),1545842,796141,0.5150209400443253
Islamic_State_of_Iraq_and_the_Levant,1406530,794907,0.5651546714254229
Stephen_Hawking,1384193,644088,0.4653166140848855


Let's make a Sankey diagram of pages flowing into and out of the Apache Spark wiki page. Eliminate paths with a count of less than 100 to declutter the diagram.

In [35]:
in_flows = df.filter((df.curr_title == 'Apache_Spark') & (df.n > 100) & (f.col('prev_title').isin(external)))\
              .select('prev_title','curr_title', 'n').collect()

out_flows = df.filter((df.prev_title == 'Apache_Spark') & (df.n > 100))\
              .select('prev_title','curr_title', 'n').collect()

rows = ""

for row in in_flows:
  rows += str([row[0], row[1], row[2]]) + ","
  
for row in out_flows:
  rows += str([row[0], row[1], row[2]]) + ","
  
rows = rows[:-1]

rows

In [36]:
content = """
<!DOCTYPE html>
<body>
<script type="text/javascript"
           src="https://www.google.com/jsapi?autoload={'modules':[{'name':'visualization','version':'1.1','packages':['sankey']}]}">
</script>

<div id="sankey_multiple" style="width: 800px; height: 640px;"></div>

<script type="text/javascript">
google.setOnLoadCallback(drawChart);
   function drawChart() {
    var data = new google.visualization.DataTable();
    data.addColumn('string', 'From');
    data.addColumn('string', 'To');
    data.addColumn('number', 'Weight');
    data.addRows([
"""
  
content = content + rows

content = content + """
    ]);
    // Set chart options
    var options = {
      width: 600,
      sankey: {
        link: { color: { fill: '#grey', fillOpacity: 0.3 } },
        node: { color: { fill: '#a61d4c' },
                label: { color: 'black' } },
      }
    };
    // Instantiate and draw our chart, passing in some options.
    var chart = new google.visualization.Sankey(document.getElementById('sankey_multiple'));
    chart.draw(data, options);
   }
</script>
  </body>
</html>"""

displayHTML(content)