In [141]:
import pandas as pd
from pyspark import SparkContext
from pyspark.sql.functions import explode, col, split, udf, lit, regexp_replace

## Task 1

### Goal 1 ( Pandas Version )

##### Read the TSV

In [69]:
df = pd.read_csv('./events.txt', sep=' ')
df

Unnamed: 0,user_uid,category,event_type
0,123,fashion,teaser_displayed
1,123,sports,teaser_opened
2,123,politics,teaser_opened
3,456,technology,teaser_opened
4,456,technology,teaser_opened
5,457,technology,teaser_opened


##### Find all the "teaser_opened" records

In [65]:
teaseropdf = df[df['event_type'].str.match('teaser_opened')]
teaseropdf

Unnamed: 0,user_uid,category,event_type
1,123,sports,teaser_opened
2,123,politics,teaser_opened
3,456,technology,teaser_opened
4,456,technology,teaser_opened
5,457,technology,teaser_opened


##### Group all the teaser_opened records w.r.t  userid and category

In [67]:
teaseropdf.groupby(['category','user_uid']).event_type.count()

category    user_uid
politics    123         1
sports      123         1
technology  456         2
            457         1
Name: event_type, dtype: int64

### Goal 2 ( PySpark Version )

In [135]:
spark = SparkSession.builder \
     .master("local") \
     .appName("Group Task") \
     .getOrCreate()

sparkdf = spark.read.csv(
      # for goal 2 version
#    "./events.txt", header=True, sep=' '
    
      # for bonus version
    "./events2.txt", header=True, sep=' '
)

sparkdf.take(10)

[Row(user_uid=u'123', category=u'[fashion,sports]', event_type=u'teaser_displayed'),
 Row(user_uid=u'123', category=u'[fashion,politics]', event_type=u'teaser_opened'),
 Row(user_uid=u'456', category=u'[technology]', event_type=u'teaser_opened'),
 Row(user_uid=u'456', category=u'[sports,technology]', event_type=u'teaser_opened'),
 Row(user_uid=u'457', category=u'[technology]', event_type=u'teaser_opened')]

In [134]:
sparkdf.filter("event_type = 'teaser_opened'").groupby('category','user_uid').count().show()

+----------+--------+-----+
|  category|user_uid|count|
+----------+--------+-----+
|technology|     456|    2|
|  politics|     123|    1|
|technology|     457|    1|
|    sports|     123|    1|
+----------+--------+-----+



### Bonus ( PySpark version ) 
###### uses events2.txt

In [136]:
# make sure we only get teaser_opened and than perform our operations for efficiency
sparkdf = sparkdf.filter("event_type = 'teaser_opened'")

s2 = sparkdf.withColumn('category', regexp_replace('category', '\[*\]*', ''))

s2.select(col("user_uid"),explode(split(col("category"),",")).alias('category')) \
  .groupby('user_uid','category').count().show()


+--------+----------+-----+
|user_uid|  category|count|
+--------+----------+-----+
|     457|technology|    1|
|     456|technology|    2|
|     456|    sports|    1|
|     123|   fashion|    1|
|     123|  politics|    1|
+--------+----------+-----+



### Task 2:

The queries are implemented without testing and basic understanding of the problem with hand. 

##### Goal 1:

SELECT
    COUNT(DISTINCT session_id) AS session_count,
    COUNT(DISTINCT user_id) AS user_id,
    SUM(CASE WHEN event_name = 'top_news_card_viewed' THEN 1 ELSE 0 END) AS tn_count
FROM
    source_blacklisted_sessions sbs
INNER JOIN  
    tn_count_pdt tnc 
ON 
    tns.session_id = sbs.session_id 
AND
    tns.user_id = sbs.user_id 


##### Goal 2:

* **CASE** is used 2 times as a subquery for count. This can be reduced with a **WHERE** clause
* No need to use **DISTINCT** when you already have a condition **event_name='article_shared'**

A simpler version of the query is written below.

**Note:** This is not a tested query. But only written to give an idea on how it should properly be, based on my understanding.

In [None]:
SELECT 
    COUNT(event_id) as share_count, 
    COUNT(user_id) as user_share_count 
FROM 
    events_table 
WHERE 
    event_name='article_shared' 

GROUP BY 
    event_id 

HAVING
    (country NOT IN('de','fr','be','gb'));
