In [1]:
import pandas as pd
import numpy as np
import random as rd
from datetime import datetime as dt
import re
import json
from tweepy import API, Cursor, OAuthHandler
import psycopg2
from psycopg2 import sql
from psycopg2.extensions import register_adapter, AsIs
register_adapter(np.int64, AsIs)
register_adapter(np.bool_, AsIs)

import warnings
warnings.filterwarnings("ignore")

from config import *



In [2]:
# Twitter API credentials:

consumer_key = tw_acc['CONSUMER_KEY'] 
consumer_secret = tw_acc['CONSUMER_SECRET'] 
access_token = tw_acc['ACCESS_TOKEN']
access_secret = tw_acc['ACCESS_SECRET']

In [3]:
# Twitter API connection:

auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
api = API(auth, wait_on_rate_limit=True)

In [4]:
# Paths:

queries_path = "queries/"
temp_data_path = "../../data/"
db_users_bkp = '2018-06-01'
logs_path = "logs/"
app_name = "usr_table"

In [5]:
# Get db config:

with open('../config/db.config') as config_file:
    db_config = json.load(config_file)

# Local database deployment
conn = psycopg2.connect(
                        dbname=db_config['db_name'],
                        user=db_config['db_user'],
                        host=db_config['db_host'],
                        port=db_config['db_port'],
                        password=db_config['db_password'],
                        options=db_config['db_options']
                        )
conn.autocommit = True
cur = conn.cursor()

schema = db_config['db_schema']

In [6]:
# Get users table from database:
with open(queries_path + 'SMI_query_users.sql', 'r') as f:
    query = f.read().format(schema=schema)
    cur.execute(query)
    db_users = pd.DataFrame(cur.fetchall(), columns = list(users_dict.keys()))
    db_users = db_users.astype({"id": object})

if db_users.shape[0] == 0:
    
    # Get initial users table from database:
    with open(queries_path + 'SMI_query_ini_users.sql', 'r') as f:
        query = f.read().format(schema=schema)
        cur.execute(query)
        db_ini_users = pd.DataFrame(cur.fetchall(), columns = list(ini_users_dict.keys()))

    users_ls = db_ini_users["screenName"].to_list()

else:

    users_ls = db_users["screenName"].to_list()

In [7]:
# Get munlist:
with open('../../data/db_munlist.json') as config_file:
    db_munlist = json.load(config_file)

In [8]:
class UsersPipeline:
    '''
    Generate new users in the users table of the database:
    '''
    def __init__(self,
                queries_path,
                conn,
                schema
                ):
        self.queries_path = queries_path
        self.conn = conn
        self.cur = conn.cursor()
        self.schema = schema

    @staticmethod
    def normalize(name):
        '''
        Function to remove accents from an alphanumeric string:
        params:
            - name: character string.
        Output: string without accents.
        '''
        replacements = (
            ("á", "a"),
            ("é", "e"),
            ("í", "i"),
            ("ó", "o"),
            ("ú", "u"),
        )
        for a, b in replacements:
            name = name.replace(a, b).replace(a.upper(), b.upper())
        return(name)

    def text_cleaner(self, name):
        '''
        Function to adapt the text of the municipalities list.
        parameters:
            - name: municipality.
        Output: municipality without special characters.
        '''
        name = self.normalize(name)
        name = name.replace("ñ", 'n')
        return(name)

    @staticmethod
    def clean_loc(loc, munlist):
        loc = upipe.text_cleaner(loc.lower().replace(',', ''))
        if loc in munlist:
            return 1
        else:
            return 0

    def treat_new_users(self, df, db_munlist):
        '''
        Function to treat the location column and filter non spanish users:
        params:
            - df: Dataframe with the new users.
            - db_munist: list of spanish municipalities to filter the new users.
        Output: Dataframe with only spanish users.
        '''
        # Split the location column into a list of strings to check if each word is in the list:
        df_t = df.copy()
        df_t['location'] = df_t['location'].apply(lambda r: r.split())
        # Clean the text of the location column:
        locls = df_t['location'].to_list()
        locls_clean = [[self.clean_loc(loc, db_munlist) for loc in userloc] for userloc in locls]
        # Filter whether the users are in the municipalities or not:
        df_t = pd.concat([df_t, pd.DataFrame([sum(userloc) for userloc in locls_clean]).rename(columns={0:'loc_filter'})], axis=1).reset_index(drop=True)
        df_t = df_t[df_t['loc_filter']>0]
        df_t.drop(columns='loc_filter', inplace=True)
        # Inner join to keep only the spanish users from the original dataframe:
        output = df.merge(df_t['id'], on='id', how='inner')
        output = output.astype({"id": str})
        output['ff_lookup'] = False
        return(output)

    @staticmethod
    def twt_transform(list):
        '''
        Function to adapt the API friends and followers information format:
        params:
            - list: json list given by the twitter API.
        Output: Friends and followers API information in a dataframe with specific columns.
        '''
        if len(list) > 0:
            #Extract each element from the json file:
            data = [x._json for x in list]

            #Convert to dataframe:
            df = pd.DataFrame(data)

            #Reorder and change column names to match the database table:
            df = df[['id', 'screen_name', 'followers_count', 'friends_count', 'protected', 'location', 'lang']]
            df.rename(columns={'screen_name':'screenName', 'followers_count':'followersCount', 'friends_count':'friendsCount'}, inplace=True)

        else:
            df = pd.DataFrame(columns=['id', 'screenName', 'followersCount', 'friendCount', 'protected', 'location', 'lang'])

        return(df)

    def get_ff(self, user):
        '''
        Function to get friends and followers from a given users.
        params:
            - user: twitter user screen name.
        Output: dataframe with all the friends and followers of a given twitter user.
        '''
        try:
            
            #Retrieve followers from a given user:
            print('Twitter API job: Retrieving followers')
            followers = []
            for fid in Cursor(api.get_followers, screen_name=user).items():
                followers.append(fid)

            if len(followers) > 0:
                #Transform format:
                df_followers = self.twt_transform(followers)
            else:
                df_followers = pd.DataFrame()
            print('Twitter API job: Number of followers retrieved: ' + str(df_followers.shape[0]))
            
            #Retrieve friends from a give user:
            print('Twitter API job: Retrieving friends')
            friends = []
            for fid in Cursor(api.get_friends, screen_name=user).items():
                friends.append(fid)
                
            if len(friends) > 0:
                #Transform format:
                df_friends = self.twt_transform(friends)
            else:
                df_friends = pd.DataFrame()
            print('Twitter API job: Number of friends retrieved from user: ' + str(df_friends.shape[0]))

            #Output dataframe:
            output = pd.concat([df_followers, df_friends], axis=0)
            output.drop_duplicates(inplace=True)
            
            #Adapt id format:
            output = output.astype({"id": object})
            if output.shape[0] > 0:
                return(output.reset_index(drop=True))
            else:
                return(pd.DataFrame())

        except Exception:
            
            pass

    @staticmethod
    def filter_usrs_loc(df, munlist):
        '''
        Function to filter the location field given a municipalities list, to ensure spanish users:
        params:
            - df: input dataframe with users information:
            - munlist: list of municipalities:
        Output: filtered users table.
        '''
        try:
            print('Data job: Filtering location of new users.')
            #Convert location field to lower case:
            df['location'] = df['location'].apply(lambda r: r.lower())

            #Filter location:
            df = df[df['location'].isin(munlist)]
            
            return(df)

        except Exception as error:
            print(error)

    def update_user_lookup(self, user):
        '''
        Function to update the lookup status of a given user via query.
        params:
            - user: screenName of a user.
        '''
        try:
            print('Database job: Updating lookup of user')
            with open(queries_path + 'SMI_update_lookup_users.sql') as f:
                cur.execute(
                    sql.SQL(f.read()).format(schema=sql.Identifier(schema)),
                    (user, )
                )
                self.conn.commit()

        except Exception as error:
            print(error)
            raise Exception

    def fetchall_SQL(self, path):
        """
        Function to fetch all observations from a query to databasee:
        params:
            - path: relative path to the file.
        """
        with open(path, 'r') as f: 
            query = f.read().format(schema=self.schema)
        cur = self.conn.cursor()
        try:
            cur.execute(query)
            db_fetch = cur.fetchall()
            return(db_fetch)

        except (Exception, psycopg2.DatabaseError) as error:
            self.conn.rollback()
            self.api_logger.exception(error)
        cur.close()

    def insert_new_users_into_db(self, user_row):
        '''
        Function to check if a new user exists and insert into db in other case.
        params:
            - user_row: row to insert into the db.
        '''

        try:
            with open(queries_path + 'SMI_insert_new_user.sql') as f:
                cur.execute(
                    sql.SQL(f.read()).format(schema=sql.Identifier(schema)),
                    user_row
                )
                self.conn.commit()
        except Exception as error:
            print(error)
            raise Exception
    
    @staticmethod
    def users_backup(df):
        df.to_csv(temp_data_path + 'db_users_' + str(dt.today().strftime("%Y-%m-%d")) + '.json')

    def get_and_insert_new_users(self, user, munlist):
        '''
        Function to create new users table from a list of existing users.
        params:
            - usr_list: twitter users list.
        Output: dataframe with new potential users.
        '''
        try:
                
            #Update ff_lookup column from a given user in database:
            self.update_user_lookup(user)
            
            #Get friends and followers from a given user:
            df_new_users = self.get_ff(user)

            #Insert new users into database and drop duplicates:
            if df_new_users.shape[0] > 0:
                df_new_users = self.treat_new_users(df_new_users, munlist)
                print('Database job: Number of new users to be inserted into DB: ', df_new_users.shape[0])
                print('Database job: Insert new users into DB')
                for i in range(df_new_users.shape[0]):
                    self.insert_new_users_into_db(tuple(df_new_users.iloc[i, :]))
                print('Database job: New users inserted into DB')
                print('Data job: Saving new users backup' )
                df = pd.DataFrame(upipe.fetchall_SQL(queries_path + 'SMI_query_new_users.sql'))
                df.columns = ['id', 'screenName', 'followersCount', 'friendsCount', 'protected', 'location', 'lang', 'ff_lookup']
                self.users_backup(df)
                print('Data job: New users backup saved')
            else:
                print('Database job: There are not new users to be inserted into DB')

        except Exception as error:
            print(error)

In [9]:
upipe = UsersPipeline(queries_path, conn, schema)

In [10]:
db_munlist = [upipe.text_cleaner(name) for name in db_munlist]

In [11]:
userls = rd.sample(users_ls, 10)

In [12]:
userls

['maclilla',
 'GUSTAVOVIEITES',
 'PalacioFuentes',
 'Yasna72',
 'Nel_Pa',
 'casasj',
 'IsaPax',
 'xavimaria',
 'olga_cecilio',
 'casalop']

In [13]:
for user in userls:
    upipe.get_and_insert_new_users(user, db_munlist)

Database job: Updating lookup of user
Twitter API job: Retrieving followers
Twitter API job: Number of followers retrieved: 16
Twitter API job: Retrieving friends
Twitter API job: Number of friends retrieved from user: 11
Database job: Number of new users to be inserted into DB:  13
Database job: Insert new users into DB
Database job: New users inserted into DB
Data job: Saving new users backup:
Data job: New users backup saved
Database job: Updating lookup of user
Twitter API job: Retrieving followers
Twitter API job: Number of followers retrieved: 29
Twitter API job: Retrieving friends
Twitter API job: Number of friends retrieved from user: 32
Database job: Number of new users to be inserted into DB:  20
Database job: Insert new users into DB
Database job: New users inserted into DB
Data job: Saving new users backup:
Data job: New users backup saved
Database job: Updating lookup of user
Twitter API job: Retrieving followers
Twitter API job: Number of followers retrieved: 9
Twitter AP

Rate limit reached. Sleeping for: 59


Twitter API job: Number of friends retrieved from user: 27
Database job: Number of new users to be inserted into DB:  22
Database job: Insert new users into DB
Database job: New users inserted into DB
Data job: Saving new users backup:
Data job: New users backup saved
Database job: Updating lookup of user
Twitter API job: Retrieving followers
Twitter API job: Number of followers retrieved: 17
Twitter API job: Retrieving friends
Twitter API job: Number of friends retrieved from user: 19
Database job: Number of new users to be inserted into DB:  14
Database job: Insert new users into DB
Database job: New users inserted into DB
Data job: Saving new users backup:
Data job: New users backup saved
Database job: Updating lookup of user
Twitter API job: Retrieving followers
Twitter API job: Number of followers retrieved: 16
Twitter API job: Retrieving friends
Twitter API job: Number of friends retrieved from user: 18
Database job: Number of new users to be inserted into DB:  19
Database job: I

In [None]:
#Servicio proxy en el que no te bloquean (te cambian la IP):