In [1]:
import os
import sys
import boto
import ast
from datetime import datetime
HOME = os.environ.get('HOME')
sys.path.append(HOME + "/comscore_processing")
from utils.log_processors import UserProcessor
#from RDLogging.logging import RDLogger
sc.addPyFile(HOME + "/comscore_processing/utils.zip")
sc.addPyFile(HOME + "/comscore_processing/RDLogging.zip")
from utils.constants import *
from utils.utils import (get_date_range, parser, 
                        apply_process_row, RDMSR_BUCKET,
                        get_log_date, s3_cleanup, get_work_list, combineLists,
                        parser,get_unprocesed_aggregates, check_aggregate_processed,
                        make_first_pass_path, eval_dict)
from utils.dynamo_writer import record_log_date
from boto.s3.connection import S3Connection
from boto.s3.connection import OrdinaryCallingFormat
from operator import add
import csv, io
from utils.config import config
from inspect import getmembers
from utils.dynamo_writer import (DynamoDBContextManager, 
        record_log_date,record_aggregate_dates)
from collections import defaultdict
from pyspark.sql.types import (StructType, StringType, ArrayType, MapType, 
        StructField)
from user_agents import parse
import re
from itertools import chain, groupby
from nltk.corpus import stopwords

In [9]:
class ProcessAggregates(object):
    """
    Currently has only one sub class :class:`~utils.session_aggregators.SessionAggregator`
    It might come in handy for aggregating visitor session info in a different 
    manner than the `SessionAggregator`.
    """



    def __init__(self, month_year, dates, site, **kwargs):
        self.config = {'abandon': 10}
        self.month_year = month_year
        self.dates = dates
        self.site = site
        self.config.update(kwargs)
    def _process(self):
        """
        Iterates through it's own members and, if callable and not starting with
        an underscore, assigns the method's output to the method's name on
        the dictionary that the class returns.
        """
        for method in getmembers(self):
            if callable(method[1]) and not method[0].startswith('_'):
                d = method[1]()
                if isinstance(d, dict):
                    self.data.update(d)
                elif d is not None:
                    self.data[method[0]] = d
        return (self.data['ns_vid'], str(self.data))


    def __call__(self, record):
        """
        Callable executed on every record via Spark rdd's ``map`` method. 
        This accepts a two-tuple, the first element of which is another
        two tuple containing the visitor id and site, the second element
        of the original tuple contains the array of dictionaries 
        representing session activity.
        """     
        self.data = {}
        vid = record[0]
        self.email_regex = re.compile(r"[^@]+@[^@]+\.[^@]+")
        self.data['ns_vid'] = vid
        self.data['site'] = self.site
        sessions = []
        for s in list(record[1]):
            if isinstance(s['session_start_time'], str):
                s['session_start_time'] = datetime.strptime(s['session_start_time'], 
                                                            '%Y-%m-%d %H:%M:%S')
            if not isinstance(s['session_start_time'], datetime):
                return            
            sessions.append(s)
        self.sessions = sorted(sessions, key=lambda x: float(x.get('session_start_utc')))
        try:
            return self._process()
        except Exception as e:
            record = record[1].append(e)
            return record
    def _path_terms(self, page_url):
        """
        This is a naive topic modeler based on the content of a page's 
        URL. The idea is to estimate how interested a given user is in 
        a topic (or single word found in a URL) based on the number of
        pages they visit with that word in the URL and the number
        of seconds spent on those pages.
        1. collects "page_path" for every session
        2. parses each url
        3. splits paths on slashes and dashes
        4. removes stopwords
        5. for every meaningful word in url, produces (word, <# views w/ word>, <# seconds w/ word>)
        6. returns string with data like so: "w1:n1:s1,w2:n2:s2,..."
        
        If a *search_str* is declared, *page_path* will be filtered for those that were shared
        on the relevent social network.
        """

        parsed = parse.urlparse('http://' + page_url).path
        if parsed:
            parsed = list(set(re.split("/|-", parsed[1:])))
            path_tokens.extend([w.lower() for w in\
                                parsed if w.lower() not in stopwords.words('english')])
        return parsed

class AggregateProcessor(ProcessAggregates):
    """
    Subclass of BaseAggregator to operate on a the session history for a single visitor id.
    Feed to a Spark RDD's mapValues function. ie.:
    
    >>> # visitorRdd is a Spark rdd like this: ((<id>, <site>),
        #    [<session_dict>,...])
    >>> sa = SessionAggregator(datetime(2015, 8, 1), 
            datetime(2015, 8, 30))
    >>> processedVisitors = visitorRdd.map(sa)
    >>> ## inspect aggregated info of single visitor id
    >>> processedVisitors.take(1)
    
    The ``__call__`` method calls _process, which loops over all 
    methods not starting with an underscore and returns a dictionary
    of {<method_name>: <method_output> ... }
    
    Each of the methods that define a key on the output accept 
    no arguments. Most simply iterate over the `sessions` attribute 
    and make a simple calculation.
    
    Many entries in the return dict are called "<something>_indicator"
    - this should be interpreted as the total number of times 
    <something> happened. 
    """
    def duration_Location(self):
        """
        Returns the number of times a visitor spent fewer than a specified
        number of seconds on a session. Defaults to 10.
        """
        details = {}
        i = 1
        for s in self.sessions:
            date = s['session_start_time'].strftime('%Y-%m-%d %H:%M:%S')
            str_dict = {}
            str_dict['duration'] = s['duration']
            str_dict['date'] = date
            str_dict['pages_visited_per_session'] = s['records_in_session']
            str_dict['city'] = s['city']
            str_dict['country'] = s['country']
            str_dict['time_zone'] = s['time_zone']
            details[i] = str_dict
            i+=1
        return str(details)
    
    def _get_tuples_from_list(self, val):
        output = []
        for s in self.sessions:
            itr_list = s[val]
            for i in itr_list:
                tup = (i[0],i[1])
                output.append(tup)
        d = defaultdict(list)
        for k,v in output:
            d[k].append(v) 
        return d
        
    def categories_viewed(self):
        cats_details = {}
        i=1
        for s in self.sessions:
            date = s['session_start_time'].strftime('%Y-%m-%d %H:%M:%S')
            cats_viewed = s['categories_viewed']
            cats_pages = self._get_tuples_from_list('category_page_path')
            str_dict = {}
            for c in cats_viewed:
                cat_name = c[0]
                if str_dict.get(cat_name):
                    str_dict[cat_name]['visit_count'] += 1
                    str_dict[cat_name]['time_spent'] += c[1]
                else:
                    str_dict[cat_name] = {}
                    str_dict[cat_name]['visited_on'] = date
                    str_dict[cat_name]['time_spent'] = c[1]
                    str_dict[cat_name]['visit_count'] = 1
                    str_dict[cat_name]['num_pages_visited'] = len(cats_pages.get(cat_name,[]))
            cats_details[i] = str_dict
            i+=1
        return str(cats_details)
    
    def user_agent(self):
        details = {}
        user_agent = ''
        i=1
        for s in self.sessions:
            str_dict = {}
            str_dict['user_agent'] = s['user_agent']
            str_dict['duration'] = s['duration']
            str_dict['date'] = s['session_start_time'].strftime('%Y-%m-%d')
            details[i] = str_dict
            i += 1
        return str(details) 

    def external_referrers(self):
        refs = []
        for s in self.sessions:
            refs.extend(s['external_referrers'])
        return str(refs)
        
    def internal_referrers(self):
        refs = []
        for s in self.sessions:
            refs.extend(s['internal_referrers'])
        return str(refs)

    def order_details(self):
        details = {}
        i=1
        for s in self.sessions:
            str_dict = {}
            date = s['session_start_time'].strftime('%Y-%m-%d %H:%M:%S')
            if 'ns_order_id' in s['value_details'].keys():
                str_dict['order_date'] = date
                if 'ns_brand' in s['value_details'].keys():
                    str_dict['brand'] = s['value_details']['ns_brand']
                if 'ns_prod_price' in s['value_details'].keys():
                    str_dict['price'] = s['value_details']['ns_prod_price']
                if 'ns_prod_grp' in s['value_details'].keys():
                    str_dict['qty'] = s['value_details']['ns_prod_grp']
                if 'ns_qty' in s['value_details'].keys():
                    str_dict['prod_grp'] = s['value_details']['ns_qty']
                if 'ns_shop' in s['value_details'].keys():
                    str_dict['shop_details'] = s['value_details']['ns_shop']
                if 'payment_type' in s['value_details'].keys():
                    str_dict['payment_type'] = s['value_details']['payment_type']
                if 'order_source' in s['value_details'].keys():
                    str_dict['order_source'] = s['value_details']['order_source']
                    
                paths = s['page_path']    
                pages_visited = []
                count = 0
                pages_visited = [path[0] for path in paths]
                for p in pages_visited:
                    fragment = urlparse("http://"+p)
                    host = fragment.netloc
                    if 'order' in host.split('.'):
                        count += 1
                str_dict['num_pagevisits_prior_to_order'] = count
                
                details[i] = str_dict
                i += 1
        return str(details)

    def healthcare(self):
        details = {}
        sumbitted_dates = []
        closed_dates = []
        conditions = set()
        i = 1
        for s in self.sessions:
            str_dict = {}
            date = s['session_start_time'].strftime('%Y-%m-%d %H:%M:%S')
            val_det = s['value_details']
            if ('link_module' in val_det.keys() and 'eBook Sign Up Popup' in val_det['link_module'].keys() \
                and 'link_name' in val_det.keys() and 'I want my free eBook' in val_det['link_name'].keys()):
                str_dict['sumbitted_date'] = date
            if ('link_module' in val_det.keys() and 'eBook Sign Up Popup' in val_det['link_module'].keys() \
                and 'link_name' in val_det.keys() and 'close' in val_det['link_name'].keys()):
                str_dict['closed_date'] = date
            if 'conditions_selected' in s.keys():
                str_dict['conditions_selected'] = s['conditions_selected']
            if bool(str_dict):
                details[i] = str_dict
                i += 1
        return str(details)
        
    def search(self):
        details = {}
        i = 1
        for s in self.sessions:
            search_terms = search_results = search_cat = search_pg_no = []
            if 'value_details' in s.keys() and 'ns_search_result' in s['value_details'].keys():
                search_terms = list(s['value_details']['ns_search_term'].keys())
                search_results = list(s['value_details']['ns_search_result'].keys())
            if 'value_details' in s.keys() and 'search_cat' in s['value_details'].keys():
                search_cat = list(s['value_details']['search_cat'].keys())
            if 'value_details' in s.keys() and 'search_pg_no' in s['value_details'].keys():
                search_pg_no = list(s['value_details']['search_pg_no'].keys())
            if search_terms:
                date = s['session_start_time'].strftime('%Y-%m-%d %H:%M:%S')
                for t in search_terms:
                    str_dict = {}
                    j = 0
                    str_dict['search_date'] = date
                    str_dict['search_term'] = t
                    str_dict['search_results_found'] = search_results[j]
                    if search_pg_no and len(search_pg_no) == len(search_terms):
                        str_dict['selected_page_no'] = search_pg_no[j]
                    if search_cat and len(search_cat) == len(search_terms):
                        str_dict['selected_page_category'] = search_cat[j]
                    details[i] = str_dict
                    i += 1
                    j += 1 
        return str(details)
    
    def ruids(self):
        ruids_list = []
        for s in self.sessions:
            if 'ruid' in s.keys():
                ruids_list.append(s['ruid'])
        return str(list(set(ruids_list)))

    def epids(self):
        epids = []
        for s in self.sessions:
            if 'epid' in s.keys():
                for e in s['epid']:
                    if not self.email_regex.match(e):
                        epids.append(e)
        return str(list(set(epids)))
    
    def emails(self):
        email = []
        for s in self.sessions:
            if 'epid' in s.keys():
                for e in s['epid']:
                    if self.email_regex.match(e):
                        email.append(e)
            if 'ruid' in s.keys() and self.email_regex.match(s['ruid']):
                email.append(s['ruid'])
            if 'user_email_from_query_strings' in s.keys():
                for e in s['user_email_from_query_strings']:
                    if self.email_regex.match(e):
                        email.append(e)
        return str(list(set(email)))
    
    def mids(self):
        mids = []
        for s in self.sessions:
            if 'consolidated_query_strings' in s.keys() and '_mid' in s['consolidated_query_strings'].keys():
                mids.extend(s['consolidated_query_strings']['_mid'])
        return str(list(set(mids)))
    
    def ip(self):
        ips = []
        for s in self.sessions:
            ips.append(s['ip'])
        return str(list(set(ips)))
    
    def trkids(self):
        trkids = []
        for s in self.sessions:
            if 'consolidated_query_strings' in s.keys():
                if 'int_trkid' in s['consolidated_query_strings'].keys():
                    trkids.extend(s['consolidated_query_strings']['int_trkid'])
                if 'trkid' in s['consolidated_query_strings'].keys():
                    trkids.extend(s['consolidated_query_strings']['trkid'])
                if 'internal_tracking_id' in s['consolidated_query_strings'].keys():
                    trkids.extend(s['consolidated_query_strings']['trkid'])
        return str(list(set(trkids)))
    
    def login_registration(self):
        details = {}
        i = 1
        for s in self.sessions:
            date = s['session_start_time'].strftime('%Y-%m-%d %H:%M:%S')
            str_dict = {}
            if 'value_details' in s.keys() and 'reg_source' in s['value_details'].keys():
                str_dict['registration_source'] = list(s['value_details']['reg_source'].keys())
                str_dict['registration_date'] = date
            if 'event_details' in s.keys() and 'login_event' in s['event_details'].keys():
                str_dict['login_date'] = date
            if 'value_details' in s.keys() and 'login_type' in s['value_details'].keys():
                str_dict['login_date'] = date
                str_dict['login_type'] = list(s['value_details']['login_type'].keys())
            if bool(str_dict):
                details[i] = str_dict
                i += 1
        return str(details)
    
    def _get_campaign_data(self, camp_type):
        details = {}
        i = 1
        for s in self.sessions:
            date = s['session_start_time'].strftime('%Y-%m-%d %H:%M:%S')
            str_dict = {}
            if 'value_details' in s.keys() and camp_type in s['value_details'].keys():
                str_dict[camp_type+'s_visited'] = list(s['value_details'][camp_type].keys())
                str_dict['visit_date'] = date
            if bool(str_dict):
                details[i] = str_dict
                i += 1
        return str(details)
    
    def ns_campaign(self):
        return self._get_campaign_data('ns_campaign')
    
    def email_campaign(self):
        return self._get_campaign_data('email_campaign')
    
    def partner_name(self):
        return self._get_campaign_data('partner_name')
    
    def _get_share_details(self, share_tag):
        details = {}
        i = 1
        for s in self.sessions:
            shared_fp = s[share_tag]
            date = s['session_start_time'].strftime('%Y-%m-%d %H:%M:%S')
            str_dict = {}
            if shared_fp is not None:
                for sfp in shared_fp:
                    category = sfp[0]
                    page = sfp[1]
                    pages_contents = s['page_content_type']
                    content_type = ''
                    if pages_contents is not None:
                        for p,c in pages_contents:
                            if p==page:
                                content_type = c
                                break
                    str_dict['category'] = category
                    str_dict['page'] = page
                    str_dict['content_type'] = content_type
                    str_dict['date'] = date
                if bool(str_dict):
                    details[i] = str_dict
                    i += 1
        return str(details)
    
    def shared_on_facebook(self):
        self._get_share_details('categories_shared_on_facebook')
        
    def shared_on_pinterest(self):
        self._get_share_details('categories_shared_on_pinterest')
        
    def shared_on_twitter(self):
        self._get_share_details('categories_shared_on_twitter')
        
    def shared_on_google_plus(self):
        self._get_share_details('categories_shared_on_google_plus')
        
    def stf_count_event(self):
        details = {}
        i = 1
        for s in self.sessions:
            date = s['session_start_time'].strftime('%Y-%m-%d %H:%M:%S')
            str_dict = {}
            if 'value_details' in s.keys():
                val_dets = s['value_details']
                form_name = content_type = []
                if 'form_type' in val_dets.keys() and 'tell_a_friend' in val_dets['form_type'].keys():
                    if 'form_name' in val_dets.keys():
                        str_dict['form_name'] = list(val_dets['form_name'].keys())
                    if 'content_type' in val_dets.keys():
                        str_dict['content_type'] = list(val_dets['content_type'].keys())
                if bool(str_dict):
                    str_dict['date'] = date
                    details[i] = str_dict
                    i += 1
        return str(details)
    
    def tellafriend_event(self):
        details = {}
        i = 1
        for s in self.sessions:
            date = s['session_start_time'].strftime('%Y-%m-%d %H:%M:%S')
            str_dict = {}
            if 'event_details' in s.keys() and 'tellafriend_event' in s['event_details'].keys():
                if 'value_details' in s.keys():
                    val_dets = s['value_details']
                    if 'form_type' in val_dets.keys():
                        str_dict['form_type'] = list(val_dets['form_type'].keys())
                    if 'form_name' in val_dets.keys():
                        str_dict['form_name'] = list(val_dets['form_name'].keys())
                    if 'content_type' in val_dets.keys():
                        str_dict['content_type'] = list(val_dets['content_type'].keys())
                    str_dict['date'] = date
                    details[i] = str_dict
                    i += 1
        return str(details)
    
    def comments(self):
        details = {}
        i = 1
        for s in self.sessions:
            date = s['session_start_time'].strftime('%Y-%m-%d %H:%M:%S')
            str_dict = {}
            if 'value_details' in s.keys() and 'qualitative_feedback_text' in s['value_details'].keys():
                str_dict['comments'] = list(s['value_details']['qualitative_feedback_text'].keys())
                str_dict['date'] = date
                details[i] = str_dict
                i += 1
        return str(details)
    
    def user_preferences(self):
        details = {}
        i = 1
        for s in self.sessions:
            date = s['session_start_time'].strftime('%Y-%m-%d %H:%M:%S')
            str_dict = {}
            if 'value_details' in s.keys():
                val_dets = s['value_details']
                if 'optin' in val_dets.keys():
                    str_dict['optin'] = list(val_dets['optin'].keys())
                if 'optout' in val_dets.keys():
                    str_dict['optout'] = list(val_dets['optout'].keys())
                if 'form_sub_pref' in val_dets.keys():
                    str_dict['preferences'] = list(val_dets['form_sub_pref'].keys())
                if bool(str_dict):
                    str_dict['date'] = date
                    details[i] = str_dict
                    i += 1
        return str(details)
    
    def newsletters(self):
        details = {}
        i = 1
        for s in self.sessions:
            date = s['session_start_time'].strftime('%Y-%m-%d %H:%M:%S')
            str_dict = {}
            if 'value_details' in s.keys():
                val_dets = s['value_details']
                if 'event_details' in s.keys() and 'nl_signup_event' in s['event_details'].keys():
                    str_dict['nl_signup'] = 'True'
                if 'nl_name' in val_dets.keys():
                    str_dict['nl_name'] = list(val_dets['nl_name'].keys())
                if bool(str_dict):
                    str_dict['date'] = date
                    details[i] = str_dict
                    i += 1
        return str(details)
    
    def forms_submitted(self):
        details = {}
        i = 1
        for s in self.sessions:
            date = s['session_start_time'].strftime('%Y-%m-%d %H:%M:%S')
            str_dict = {}
            if 'value_details' in s.keys():
                val_dets = s['value_details']
                if 'form_name' in s.keys():
                    str_dict['form_name'] = list(val_dets['form_name'].keys())
                if 'form_type' in val_dets.keys():
                    str_dict['form_type'] = list(val_dets['form_type'].keys())
                if bool(str_dict):
                    str_dict['date'] = date
                    details[i] = str_dict
                    i += 1
        return str(details)
    
    def page_details(self):
        for s in self.sessions:
            return str(s['page_details'])
        
    def _get_event_entry_date(self, event_name):
        dates = []
        i = 1
        for s in self.sessions:
            date = s['session_start_time'].strftime('%Y-%m-%d %H:%M:%S')
            str_dict = {}
            if 'event_details' in s.keys() and event_name in s['event_details'].keys():
                dates.append(date)
        return str(dates)
    
    def holiday__celebration_signup_event(self):
        return self._get_event_entry_date('holiday__celebration_signup_event')
    
    def back_to_school_signup_event(self):
        return self._get_event_entry_date('back_to_school_signup_event')
    
    def top_10_signup_event(self):
        return self._get_event_entry_date('top_10_signup_event')
    
    def weeknight_dinners_signup_event(self):
        return self._get_event_entry_date('weeknight_dinners_signup_event')
    
    def thanksgiving_countdown_signup_event(self):
        return self._get_event_entry_date('thanksgiving_countdown_signup_event')
    
    def christmas_cookie_countdown_signup_event(self):
        return self._get_event_entry_date('christmas_cookie_countdown_signup_event')
    
    def dessert_signup_event(self):
        return self._get_event_entry_date('dessert_signup_event')
    
    def taste_of_home_special_edition_signup_event(self):
        return self._get_event_entry_date('taste_of_home_special_edition_signup_event')
    
    def slow_cooker__casserole_signup_event(self):
        return self._get_event_entry_date('slow_cooker__casserole_signup_event')
    
    def healthy_signup_event(self):
        return self._get_event_entry_date('healthy_signup_event')
    
    def country_woman_insider_signup_event(self):
        return self._get_event_entry_date('country_woman_insider_signup_event')
    
    def recipe_of_the_day_signup_event(self):
        return self._get_event_entry_date('recipe_of_the_day_signup_event')
    
    def cooking_school_signup_event(self):
        return self._get_event_entry_date('cooking_school_signup_event')
    
    def sweeps_entry_event(self):
        return self._get_event_entry_date('sweeps_entry_event')
    
    def cancel_mag_event(self):
        return self._get_event_entry_date('cancel_mag_event')
    
    def cancel_ser_event(self):
        return self._get_event_entry_date('cancel_ser_event')
    
    def slideshow_event(self):
        return self._get_event_entry_date('slideshow_event')
    
    def slideshow_ad_event(self):
        return self._get_event_entry_date('slideshow_ad_event')
    def google_search_strings(self):
        details = []
        for s in self.sessions:
            if 'google_search_from_referrer' in s.keys():
                details.extend(s['google_search_from_referrer'])
        return str(details)

In [None]:
def merge_duration_location(existing_data, processed_data):
    existing_data = ast.literal_eval(existing_data)
    processed_data = ast.literal_eval(processed_data)
    storage_dict = {}
    for k,v in existing_data.items():
        d = defaultdict(list)
        existing_data[k].extend(processed_data[k])
        for i, j in existing_data[k]:
            d[i].extend(j)
        storage_dict[k] = list(d.items())
    return storage_dict
    
def merge_categories(existing_data, processed_data):
    existing_data = ast.literal_eval(existing_data)
    processed_data = ast.literal_eval(processed_data)
    return processed_data
    
def merge_aggregate_data(row):
    ns_vid = row[0]
    data = {}
    processed_data = ast.literal_eval(row[1])
    existing_data = ast.literal_eval(row[2])
    data['duration_Location'] = merge_duration_location(existing_data['duration_Location'], processed_data['duration_Location'])
    data['categories_viewed'] = merge_categories(existing_data['categories_viewed'], processed_data['categories_viewed'])
    return (ns_vid, str(data))

def get_new_data(row):
    ns_vid = row[0]
    processed_data = ast.literal_eval(row[1])
    return (ns_vid, str(processed_data))

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

def diff(first, second):
    second = set(second)
    return [item for item in first if item not in second]

In [None]:
month_year = '201609'
site = 'rd'
processing_dates = ['2015-12-29']
input_paths = [make_first_pass_path(d, site) for d in processing_dates]
vsessions = sqlContext.read.parquet(*input_paths).rdd
vsessions_group = vsessions.map(lambda x: ((x.id), \
                [eval_dict(d) for d in x.sessions]), True)
vsessions_comb = vsessions_group.reduceByKey(combineLists)

In [None]:
def get_search_details(row):
    sessions = row[1]
    for s in sessions:
        if 'google_search_from_referrer' in s.keys():
            return row

data = vsessions_comb.map(get_search_details).filter(lambda a: a is not None)
data.take(10)

In [None]:
def get_search_details_from_val_dets(row):
    sessions = row[1]
    for s in sessions:
        if 'event_details' in s.keys() and 'add_groc_event' in s['event_details'].keys():
            return str(s)

data = vsessions_comb.map(get_search_details_from_val_dets).filter(lambda a: a is not None)
data.take(10)

In [10]:
ap = AggregateProcessor(month_year, processing_dates, site)
processed = vsessions_comb.map(ap).filter(lambda a: a is not None)

In [None]:
processed.take(2)

In [11]:
def from_grpd(row):
    sessions = ast.literal_eval(row[1])
    orders = ast.literal_eval(sessions['google_search_strings'])
    return sessions if orders else None
grpd = processed.map(from_grpd).filter(lambda a: a is not None)
grpd.take(10)

[{'back_to_school_signup_event': '[]',
  'cancel_mag_event': '[]',
  'cancel_ser_event': '[]',
  'categories_viewed': "{1: {'advice~relationships': {'time_spent': 45, 'num_pages_visited': 1, 'visited_on': '2015-12-29 21:57:10', 'visit_count': 1}}}",
  'christmas_cookie_countdown_signup_event': '[]',
  'comments': '{}',
  'cooking_school_signup_event': '[]',
  'country_woman_insider_signup_event': '[]',
  'dessert_signup_event': '[]',
  'duration_Location': "{1: {'duration': 50, 'pages_visited_per_session': 2, 'time_zone': 'Asia/Kathmandu', 'date': '2015-12-29 21:57:10', 'country': 'Nepal', 'city': 'kathmandu'}}",
  'email_campaign': '{}',
  'emails': '[]',
  'epids': '[]',
  'external_referrers': "['www.google.com.np/search']",
  'forms_submitted': '{}',
  'google_search_strings': "['love story fictions']",
  'healthcare': '{}',
  'healthy_signup_event': '[]',
  'holiday__celebration_signup_event': '[]',
  'internal_referrers': "['www.rd.com/advice/relationships/the-best-love-stories-o

In [None]:
if check_aggregate_processed(month_year, site):
    existing_aggregate = sqlContext.read.parquet(S3_USER_AGGREGATE.format(month_year=month_year,site=site))
    sample = existing_aggregate.take(1)
    existing_methods = list(ast.literal_eval(sample[0][1]).keys())
    new_tags = diff(processed_methods, existing_methods)
    old_unprocessed_tags = diff(existing_methods, processed_methods)
    new_tags.extend(old_unprocessed_tags)
existing_aggregate.take(10)

In [None]:
processed_user_schema = StructType([
   StructField("ns_vid", StringType(), True),
   StructField("processed_data", StringType(), True),
])
aggregate_user_schema = StructType([
   StructField("ns_vid", StringType(), True),
   StructField("data", StringType(), True),
])

print('existing')
existing_aggregate.registerTempTable("existing")
processed_sql = sqlContext.createDataFrame(processed, processed_user_schema)
processed_sql.registerTempTable("processed")
merged_users = sqlContext.sql("SELECT processed.ns_vid, processed.processed_data, existing.data FROM processed, existing  WHERE processed.ns_vid = existing.ns_vid")
merged_users_data = merged_users.map(merge_aggregate_data)

new_users = sqlContext.sql("SELECT processed.ns_vid, processed.processed_data FROM processed left join existing on  processed.ns_vid = existing.ns_vid where existing.ns_vid is NULL")
new_users_data = new_users.map(get_new_data)

existing_users = sqlContext.sql("SELECT existing.ns_vid, existing.data FROM existing left join processed on  existing.ns_vid = processed.ns_vid where processed.ns_vid is NULL")
existing_users_data = existing_users.map(get_new_data)


df_new_processed = sqlContext.createDataFrame(new_users_data, aggregate_user_schema)
merged_users_data.take(10)
#df_merged = sqlContext.createDataFrame(merged_users_data, aggregate_user_schema)
#df_existing_unchanged = sqlContext.createDataFrame(existing_users_data, aggregate_user_schema)
#all_data = unionAll(df_new_processed, df_merged, df_existing_unchanged)
#all_data.repartition(20).write.parquet(S3_USER_AGGREGATE.format(month_year=month_year,site=site), mode='overwrite')
