In [1]:
import sys
import findspark
from pyspark.sql import SparkSession

findspark.init()

spark = SparkSession.builder.appName('example application').getOrCreate()
assert sys.version_info >= (3, 4) # make sure we have Python 3.4+
assert spark.version >= '2.2' # make sure we have Spark 2.2+

In [None]:
# Combine data into Single text file <Sources: Insta, FB, Twitter, FB>
# No cleaning
from pyspark.sql.types import StructField, StructType, StringType, ArrayType, IntegerType
from pyspark.sql.functions import lower, col, udf, concat_ws, collect_list
from pyspark.sql import Row
from nltk.corpus import stopwords
import nltk
import pandas as pd
from os import listdir, makedirs
from os.path import isfile, join, exists
import json
import re

all_data = pd.read_csv('influencer_list.csv', sep=',')
print(all_data.shape)

sc = spark.sparkContext

def get_handle(url):
    splits = url.split('/')
    if url:
        if url.endswith('/'):
            handle = splits[-2]
        else:
            handle = splits[-1]
    else:
        handle = None    
    return handle

tw_fields = ['screen_name', 'tweet_text', 'hashtags', 'favorites', 'retweet_count']
tw_schema =  StructType([
    StructField('screen_name', StringType(), True),
    StructField('tweet_text', StringType(), True),
    StructField('hashtags', ArrayType(StringType()), True),
    StructField('favorites', IntegerType(), True),
    StructField('retweet_count', IntegerType(), True),
])

ig_fields = ['twitter_handle', 'instagram_handle', 'likes', 'comments',
         'hashtags', 'caption', 'timestamp', 'image_thumbnail']

ig_schema =  StructType([
    StructField('twitter_handle', StringType(), True),
    StructField('instagram_handle', StringType(), True),
    StructField('likes', IntegerType(), True),
    StructField('comments', IntegerType(), True),
    StructField('hashtags', ArrayType(StringType()), True),
    StructField('caption', StringType(), True),
    StructField('timestamp', StringType(), True),
    StructField('image_thumbnail', StringType(), True),
])

fb_fields = ['twitter_handle', 'fb_handle', 'fb_name', 'fb_no_of_comments',
         'fb_time_created', 'fb_description', 'fb_post_link', 'fb_img_link'
         'fb_shares', 'fb_type']

fb_schema =  StructType([
    StructField('twitter_handle', StringType(), True),
    StructField('fb_handle', StringType(), True),
    StructField('fb_name', StringType(), True),
    StructField('fb_no_of_comments', IntegerType(), True),
    StructField('fb_time_created', StringType(), True),
    StructField('fb_description', StringType(), True),
    StructField('fb_post_link', StringType(), True),
    StructField('fb_img_link', StringType(), True),
    StructField('fb_shares', IntegerType(), True),
    StructField('fb_type', StringType(), True),
])

yt_fields = ['twitter_handle', 'video_id', 'likes', 'dislikes',
         'comments', 'views', 'title', 'description'
         'tags', 'publishat', 'cc_filename']

yt_schema =  StructType([
    StructField('twitter_handle', StringType(), True),
    StructField('video_id', StringType(), True),
    StructField('likes', IntegerType(), True),
    StructField('dislikes', IntegerType(), True),
    StructField('comments', IntegerType(), True),
    StructField('views', IntegerType(), True),
    StructField('title', StringType(), True),
    StructField('description', StringType(), True),
    StructField('tags', ArrayType(StringType()), True),
    StructField('publishat', StringType(), True),
    StructField('cc_filename', StringType(), True),
])

yt_path = 'youtube-data'

for index, row in all_data.iterrows():
    tw_handle = get_handle(row['Twitter'])
    errors = ['Dave2Dtv', 'susiebubble', 'ladolcevitablog', 'OMGitsfirefoxx', 'takahashimari',
             'mannymua733','zoella']
    if tw_handle in errors:
        tw_path = 'twitter-data/{}'.format(tw_handle)
        ig_path = 'instagram-data/{}'.format(tw_handle)
        fb_path = 'facebook-data/{}'.format(tw_handle)

        tw_df = spark.read.json(tw_path, schema=tw_schema).select('tweet_text')
        tw_string = tw_df.agg(concat_ws(". ", collect_list(tw_df.tweet_text))).alias('all_tweets')
        if not exists(fb_path):        
            ig_df = spark.read.json(yt_path, schema=ig_schema).select('caption')
            ig_string = ig_df.agg(concat_ws(". ", collect_list(ig_df.caption))).alias('all_captions')
            unioned = tw_string.union(ig_string).alias('unioned')
            print("No FB")
        elif not exists(ig_path):
            fb_df = spark.read.json(fb_path, schema=fb_schema).select('fb_description')
            fb_string = fb_df.agg(concat_ws(". ", collect_list(fb_df.fb_description))).alias('all_descriptions')
            unioned = tw_string.union(fb_string).alias('unioned')
            print("No IG")
        else:
            unioned = tw_string.union(ig_string).union(fb_string).alias('unioned')
            print("All available")

        output_path = '3-combined-data/{}'.format(tw_handle)
        print("Working on:{}".format(tw_handle))
        unioned.write.text(output_path)


In [None]:
from pyspark.sql.types import StructField, StructType, StringType, ArrayType, IntegerType
from pyspark.sql.functions import lower, col, udf
from pyspark.sql import Row
from nltk.corpus import stopwords
import nltk
import pandas as pd
from os import listdir, makedirs
from os.path import isfile, join, exists
import json
import re

extended_stopwords = [word.strip().lower() for word in open('g10000.txt')]

all_data = pd.read_csv('influencer_list.csv', sep=',')
print(all_data.shape)

sc = spark.sparkContext

def get_handle(url):
    splits = url.split('/')
    if url:
        if url.endswith('/'):
            handle = splits[-2]
        else:
            handle = splits[-1]
    else:
        handle = None    
    return handle


category_dict = dict()
for index, row in all_data.iterrows():
    handle = get_handle(row['Twitter'])
    category_dict[handle] = str(row['Category']).lower()


youtube_words = ['youtube', 'yt', 'https', 'http'] #Need to add more if required
fields = ['twitter_handle', 'video_id', 'likes', 'dislikes',
         'comments', 'views', 'title', 'description'
         'tags', 'publishat', 'cc_filename']

schema =  StructType([
    StructField('twitter_handle', StringType(), True),
    StructField('video_id', StringType(), True),
    StructField('likes', IntegerType(), True),
    StructField('dislikes', IntegerType(), True),
    StructField('comments', IntegerType(), True),
    StructField('views', IntegerType(), True),
    StructField('title', StringType(), True),
    StructField('description', StringType(), True),
    StructField('tags', ArrayType(StringType()), True),
    StructField('publishat', StringType(), True),
    StructField('cc_filename', StringType(), True),
])

def clean_cc(text):
    text = text.lower()
    if text:
        cleaned = re.sub("(@[A-Za-z0-9_]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)"," ", text).split()
        cleaned_ws = [word for word in cleaned if word not in stopwords and word not in youtube_words \
                     and word not in extended_stopwords]
        cleaned_wl = [lemma.lemmatize(word) for word in cleaned_ws]

        cleaned_wl = ' '.join(cleaned_ws) 
        return cleaned_wl.strip()
    else:
        return None
    
stopwords = stopwords.words('english')
lemma = nltk.wordnet.WordNetLemmatizer()

cleaner = udf(clean_cc, StringType())

path = 'youtube-data-v2'
youtube_files = [f for f in listdir(path) if isfile(join(path, f)) and (not f.startswith('.'))]

for youtube_file in youtube_files:
    print('Working on file: {}'.format(youtube_file))
    df = spark.read.json('youtube-data-v2/{}'.format(youtube_file), schema=schema)
    cc_files = list(df.select('cc_filename').collect())
    category = category_dict[youtube_file]
    print('Working on file: {} Category: {}'.format(youtube_file, category))
    cc_path_prefix = '{}/others/{}/{}_cc/'.format(path, category, category)
    output_path_prefix = cc_path_prefix.replace(category + '_cc', category + '_cc_cleaned')
    output_directory = output_path_prefix + youtube_file + '/'
    if not exists(output_directory):
        makedirs(output_directory)
    for cc_file in cc_files:
        cc_name = cc_file['cc_filename']
        cc_file_path = cc_path_prefix + cc_name + '.txt'
        if exists(cc_file_path):
            cc_txt = sc.textFile(cc_file_path)
            cleaned_cc = cc_txt.map(clean_cc)
            output_path = output_directory + cc_name
            if not exists(output_path):
                cleaned_cc.saveAsTextFile(output_path)
            else:
                print("NOTICE-EXISTS;{}".format(output_path))

In [None]:
from pyspark.sql.types import StructField, StructType, StringType, ArrayType, IntegerType
from pyspark.sql.functions import lower, col, udf
from pyspark.sql import Row
from nltk.corpus import stopwords
import nltk
import pandas as pd
from os import listdir, makedirs
from os.path import isfile, join, exists
import json
import re

all_data = pd.read_csv('influencer_list.csv', sep=',')
print(all_data.shape)

sc = spark.sparkContext
# sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

def get_handle(url):
    splits = url.split('/')
    if url:
        if url.endswith('/'):
            handle = splits[-2]
        else:
            handle = splits[-1]
    else:
        handle = None    
    return handle


category_dict = dict()
for index, row in all_data.iterrows():
    handle = get_handle(row['Twitter'])
    category_dict[handle] = str(row['Category']).lower()


youtube_words = ['youtube', 'yt', 'https', 'http'] #Need to add more if required
fields = ['twitter_handle', 'video_id', 'likes', 'dislikes',
         'comments', 'views', 'title', 'description'
         'tags', 'publishat', 'cc_filename']

schema =  StructType([
    StructField('twitter_handle', StringType(), True),
    StructField('video_id', StringType(), True),
    StructField('likes', IntegerType(), True),
    StructField('dislikes', IntegerType(), True),
    StructField('comments', IntegerType(), True),
    StructField('views', IntegerType(), True),
    StructField('title', StringType(), True),
    StructField('description', StringType(), True),
    StructField('tags', ArrayType(StringType()), True),
    StructField('publishat', StringType(), True),
    StructField('cc_filename', StringType(), True),
])

path = 'youtube-data-v2'
youtube_files = [f for f in listdir(path) if isfile(join(path, f)) and (not f.startswith('.'))]

for youtube_file in youtube_files:
    df = spark.read.json('youtube-data-v2/{}'.format(youtube_file), schema=schema)
    cc_files = list(df.select('cc_filename').collect())
    category = category_dict[youtube_file]
    print('Working on file: {} Category: {}'.format(youtube_file, category))
    cc_path_prefix = '{}/others/{}/{}_cc/'.format(path, category, category)
    locations = list()
    for cc_file in cc_files:
        cc_name = cc_file['cc_filename']
        cc_file_path = cc_path_prefix + cc_name + '.txt'
        if exists(cc_file_path):
            locations.append(cc_file_path)
    loc_str = ','.join(locations)
    sample_rdd = sc.textFile(loc_str)
    output_loc = 'youtube-combined-cc/{}'.format(youtube_file)

    try:
        sample_rdd.coalesce(1).saveAsTextFile(output_loc)
    except Exception:
        print('error on: {}'.format(youtube_file))

In [None]:
all_details = pd.read_csv('all_influencers_details.csv', sep=';')
for index, row in all_details.iterrows():
    all_locations = list()
    tw_handle = row['tw_handle']
    combined_3_path = '3-combined-data/{}/'.format(tw_handle)
    for f in listdir(combined_3_path):
        if isfile(join(combined_3_path, f)) and (not f.startswith('.')) and (not f.startswith('_')):
            all_locations.append(join(combined_3_path, f))
    youtube_comb_cc_path = 'youtube-combined-cc/{}/'.format(tw_handle)
    if exists(youtube_comb_cc_path):
        for f in listdir(youtube_comb_cc_path):
            if isfile(join(youtube_comb_cc_path, f)) and (not f.startswith('.')) and (not f.startswith('_')):
                all_locations.append(join(youtube_comb_cc_path, f))
    locations_str = ','.join(all_locations)
#     print(locations_str)
    rdd = sc.textFile(locations_str)
    output_loc_comb = 'combined-data-4-sources/{}'.format(tw_handle)
    print("Working on:{}".format(tw_handle))
    try:
        rdd.coalesce(1).saveAsTextFile(output_loc_comb)
    except Exception:
        print('error on: {}'.format(tw_handle))
    

In [None]:
#         res = selected.join(df, on=df['class_description'] == selected['vgg19_class_description']).orderBy('score')
#         res.select('vgg19_class_description', 'score', 'image').drop_duplicates().orderBy('score').show()
#         res = df.select('*').where(df['class_description'].isin(distinct_classes))

#         vgg19_gpd = vgg19.groupby('class_description').agg(avg('score').alias('vgg19_score'))
#         vgg16_gpd = vgg16.groupby('class_description').agg(avg('score').alias('vgg16_score'))

# select vgg19 into different df; select vgg16 into different df
# groupby object; agg=average score
# join by object; avgscore-vgg16 and avgscore-vgg19;
# combined score = weighted score for each object with 0.6 and 0.4 weights

In [54]:
from pymongo import MongoClient
from pyspark.sql.types import DoubleType
from pyspark.sql.types import StructField, StructType, StringType, ArrayType, IntegerType
from pyspark.sql.functions import avg, count, col
from os import listdir, makedirs
from os.path import isfile, join, exists

client = MongoClient('localhost', 27017)
db = client.influencers_db
collection = db.cv_collection

image_rec_path = 'keras-vgg-export/reformatted/'
img_vgg_schema =  StructType([
    StructField('class_name', StringType(), True),
    StructField('class_description', StringType(), True),
    StructField('score', DoubleType(), True),
    StructField('image', StringType(), True),
    StructField('model', StringType(), True),
])


keras_files = [f for f in listdir(image_rec_path) if isfile(join(image_rec_path, f)) and (not f.startswith('.'))]

for keras_file in keras_files:
#     if keras_file == 'MKBHD':
    df = spark.read.csv(join(image_rec_path, keras_file), schema = img_vgg_schema, header=True)
    vgg19 = df.select('*').where(df['model'] == 'VGG19').withColumnRenamed('class_description', 'vgg19_class_description')
    vgg16 = df.select('*').where(df['model'] == 'VGG16').withColumnRenamed('class_description', 'vgg16_class_description')
    vgg19_gpd = vgg19.groupby('vgg19_class_description').agg(count('*').alias('vgg19_count'))
    vgg16_gpd = vgg16.groupby('vgg16_class_description').agg(count('*').alias('vgg16_count'))
    joined = vgg19_gpd.join(vgg16_gpd, on=vgg16_gpd['vgg16_class_description'] == vgg19_gpd['vgg19_class_description'])
    combined = joined.withColumn('combined_score', 0.6*joined['vgg19_count'] + 0.4*joined['vgg16_count'])
    combined_pd = combined.toPandas()
    top5_pcile = combined_pd.quantile(0.95)['combined_score']
    selected = combined.select('*').where(combined['combined_score'] >= top5_pcile).orderBy('combined_score', ascending=False)
    distinct_classes = selected.select('vgg19_class_description').distinct().toPandas()['vgg19_class_description'].tolist()
    result = df.filter(col('class_description').isin(distinct_classes)).drop_duplicates(['class_description']).orderBy('score', ascending=False)
    pd_Result = result.toPandas()
    pd_Result['tw_handle'] = keras_file
    #         print(pd_Result.head(10))
    records = pd_Result.to_dict(orient='records')
    op = collection.insert_many(records)
    print(op)

<pymongo.results.InsertManyResult object at 0x1141a4548>
<pymongo.results.InsertManyResult object at 0x1141e4448>
<pymongo.results.InsertManyResult object at 0x1139f9988>
<pymongo.results.InsertManyResult object at 0x113903a48>
<pymongo.results.InsertManyResult object at 0x113868208>
<pymongo.results.InsertManyResult object at 0x1139fab48>
<pymongo.results.InsertManyResult object at 0x1139fa408>
<pymongo.results.InsertManyResult object at 0x11419e188>
<pymongo.results.InsertManyResult object at 0x11414a648>
<pymongo.results.InsertManyResult object at 0x11398e848>
<pymongo.results.InsertManyResult object at 0x113cdf508>
<pymongo.results.InsertManyResult object at 0x11418c388>
<pymongo.results.InsertManyResult object at 0x10e7d0e08>
<pymongo.results.InsertManyResult object at 0x1141a4548>
<pymongo.results.InsertManyResult object at 0x114163648>
<pymongo.results.InsertManyResult object at 0x113df4888>
<pymongo.results.InsertManyResult object at 0x114186448>
<pymongo.results.InsertManyResu

ValueError: need at least one array to concatenate

3