In [1]:
import os
import io
import json
import pymongo
from pprint import pprint as pp
import csv
from collections import namedtuple
import time

In [4]:
#save json file, json serialization
class IO_json(object):
    
    def __init__(self, filepath, filename, filesuffix='json'):
        self.filepath = filepath        # /path/to/file  without the '/' at the end
        self.filename = filename        # FILE_NAME
        self.filesuffix = filesuffix
        # self.file_io = os.path.join(dir_name, '.'.join((base_filename, filename_suffix)))

    def save(self, data):
        if os.path.isfile('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix)):
            # Append existing file
            with io.open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'a', encoding='utf-8') as f:
                f.write(unicode(json.dumps(data, ensure_ascii= False))) 
                # In python 3, there is no "unicode" function 
                #f.write(json.dumps(data, ensure_ascii= False)) # create a \" escape char for " in the saved file        
        else:
            # Create new file
            with io.open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'w', encoding='utf-8') as f:
                f.write(unicode(json.dumps(data, ensure_ascii= False)))
                # f.write(json.dumps(data, ensure_ascii= False))    

    def load(self):
        with io.open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), encoding='utf-8') as f:
            return f.read()

In [51]:
class IO_csv(object):

    def __init__(self, filepath, filename, filesuffix='csv'):
        self.filepath = filepath       # /path/to/file  without the '/' at the end
        self.filename = filename       # FILE_NAME
        self.filesuffix = filesuffix
         # self.file_io = os.path.join(dir_name, '.'.join((base_filename, filename_suffix)))

    def save(self, data, NTname, fields):
        # NTname = Name of the NamedTuple
        # fields = header of CSV - list of the fields name
        NTuple = namedtuple(NTname, fields)

        if os.path.isfile('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix)):
            # Append existing file
            with open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'ab') as f:
                writer = csv.writer(f)
                
                writer.writerows([row for row in map(NTuple._make, data)])
                # list comprehension using map on the NamedTuple._make() iterable and the data file to be saved
                # Notice writer.writerows and not writer.writerow (i.e. list of multiple rows sent to csv file
        else:
            # Create new file
            with open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'wb') as f:
                writer = csv.writer(f)
                
                writer.writerow(fields) # fields = header of CSV - list of the fields name

                writer.writerows([row for row in map(NTuple._make, data)])
                #  list comprehension using map on the NamedTuple._make() iterable and the data file to be saved
                # Notice writer.writerows and not writer.writerow (i.e. list of multiple rows sent to csv file

    def load(self, NTname, fields):

        NTuple = namedtuple(NTname, fields)

        with open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix),'rU') as f:
            reader = csv.reader(f)
            for row in map(NTuple._make, reader):
                # Using map on the NamedTuple._make() iterable and the reader file to be loaded
                yield row

In [10]:
from pymongo import MongoClient as MCli

class IO_mongo(object):
    conn={'host':'localhost', 'ip':'27017'}
    
    def __init__(self, db='twtr_db', coll='twtr_coll', **conn ):
        # Connects to the MongoDB server
        self.client = MCli(**conn)
        self.db = self.client[db]
        self.coll = self.db[coll]
    
    def save(self, data):
        # Insert to collection in db
        return self.coll.insert(data)
    
    def load(self, return_cursor=False, criteria=None, projection=None):
        if criteria is None:
            criteria = {}
        if projection is None:
            cursor = self.coll.find(criteria)
        else:
            cursor = self.coll.find(criteria, projection)
        # Return a cursor for large amounts of data
        if return_cursor:
            return cursor
        else:
            return [ item for item in cursor ]
    
    

In [8]:
#fields01 es schema o el header para utilizar csv
fields01 = ['id', 'created_at', 'user_id', 'user_name', 'tweet_text', 'url']
#create namedTuple
Tweet01 = namedtuple('Tweet01',fields01)

def parse_tweet(data):
    #"""
    #Parse a ``tweet`` from the given response data.
    #"""
    return Tweet01(id=data.get('id', None),
                   created_at=data.get('created_at', None),
                   user_id=data.get('user_id', None),
                   user_name=data.get('user_name', None),
                   tweet_text=data.get('tweet_text', None),
                   url=data.get('url'))

In [7]:
#Helper Methods

def parse_date(s):
    return time.strftime('%Y-%m-%d %H:%M:%S', time.strptime(s,'%a %b %d %H:%M:%S +0000 %Y'))

def parse_geo(g,index):
    try:
        return str(g["geo"]["coordinates"][index])
    except:
        return ""

def extract_tweet(statuses):
    return [ {'id':status['id'],
            'created_at' :parse_date(status['created_at']),
            'user_id'    :status['user']['id'],
            'user_name'  :status['user']['name'], 
            'tweet_text' :status['text'].encode('utf-8'),
            'url':url['expanded_url']} 
            for status in statuses
                for url in status['entities']['urls'] ]

def extract_tweet_noURL(statuses):
    return [ {'id':status['id'],
              'created_at' :parse_date(status['created_at']),
              'user_id'    :status['user']['id'],
              'user_name'  :status['user']['name'],
              'tweet_text' :status['text'].encode('utf-8') }
                for status in statuses ]

## conection to twitter class

In [12]:
import twitter
import urlparse # python 2.7
# import urllib # python 3.0
import logging
import time
from datetime import datetime

In [98]:
class TwitterAPI(object):
#"""
#TwitterAPI class allows the Connection to Twitter via OAuth
#once you have registered with Twitter and receive the
#necessary credentials
#"""
    def __init__(self):
        consumer_key = '7cafpvyNC3G5jBy7mA27bx9Oo'
        consumer_secret = 'BnomaQh2WNblSaRrfmWZuKaWy60KOYqIr9Jy1gomFEWJWz1vr7'
        access_token = '97539286-QWma2TH9cCBSLItuavZeoo11GCsiaDp7Lm9dpxrrE'
        access_secret = 'sEcvyLDjlHpUmng7wo05iWAFXuLu29WXmPFXY5k7IZ0SN'
        self.consumer_key = consumer_key
        self.consumer_secret = consumer_secret
        self.access_token = access_token
        self.access_secret = access_secret
        self.retries = 3
        self.auth = twitter.oauth.OAuth(access_token, access_secret, consumer_key, consumer_secret)
        self.api = twitter.Twitter(auth=self.auth)
        
        #IMPORTANTE se crea un log 
        # logger initialisation
        appName = 'twt150530'
        self.logger = logging.getLogger(appName)
        # create console handler and set level to debug
        logPath = '/home/juandavid/Documentos/archivosIpython/Chpater3/LogChapter3'
        fileName = appName
        fileHandler = logging.FileHandler("{0}/{1}.log".format(logPath, fileName))
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        fileHandler.setFormatter(formatter)
        self.logger.addHandler(fileHandler)
        self.logger.setLevel(logging.DEBUG)
        
        # Save to JSON file initialisation
        jsonFpath = '/home/juandavid/Documentos/archivosIpython/Chpater3/data'
        jsonFname = 'twtr15053001'
        self.jsonSaver = IO_json(jsonFpath, jsonFname)
        
        # Save to MongoDB Intitialisation
        self.mongoSaver = IO_mongo(db='twtr01_db', coll='twtr01_coll')
        
    def searchTwitter(self, q, max_res=10,**kwargs):
        search_results = self.api.search.tweets(q=q, count=10, **kwargs)
        statuses = search_results['statuses']
        max_results = min(1000, max_res)
        
        for _ in range(10):
            try:
                next_results = search_results['search_metadata']['next_results']
                # self.logger.info('info' in searchTwitter - next_results:%s'% next_results[1:])
            except KeyError as e:
                self.logger.error('error in searchTwitter: %s' %(e))
                break
            
            next_results = urlparse.parse_qsl(next_results[1:]) # python 2.7
            #next_results = urllib.parse.parse_qsl(next_results[1:]) python 3.0
            # self.logger.info('info' in searchTwitter - next_results[max_id]:', next_results[0:])
            kwargs = dict(next_results)
            # self.logger.info('info' in searchTwitter - next_results[max_id]:%s'% kwargs['max_id'])
            search_results = self.api.search.tweets(**kwargs)
            statuses += search_results['statuses']
            self.saveTweets(search_results['statuses'])
            if len(statuses) > max_results:
                self.logger.info('info in searchTwitter - got %i tweets - max: %i' %(len(statuses), max_results))
                break
        return statuses
        
    def saveTweets(self, statuses):
        # Saving to JSON File
        self.jsonSaver.save(statuses)

        # Saving to MongoDB
        for s in statuses:
            self.mongoSaver.save(s)
    
    def parseTweets(self, statuses):
        return [(status['id'],
            status['created_at'],
            status['user']['id'],
            status['user']['name'],
            status['text'],
            url['expanded_url'])
                for status in statuses
                    for url in status['entities']['urls'] ]
    
    def getTweets(self, q, max_res=10):
        #"""
        #Make a Twitter API call whilst managing rate limit and errors.
        #"""
        def handleError(e, wait_period=2, sleep_when_rate_limited=True):
            if wait_period > 3600: # Seconds
                self.logger.error('Too many retries in getTweets: %s' %(e))
                raise e
            if e.e.code == 401:
                self.logger.error('error 401 * Not Authorised * in getTweets: %s' %(e))
                return None
            elif e.e.code == 404:
                self.logger.error('error 404 * Not Found * in getTweets: %s' %(e))
                return None
            elif e.e.code == 429:
                self.logger.error('error 429 * API Rate Limit Exceeded * in getTweets: %s' %(e))
                if sleep_when_rate_limited:
                    self.logger.error('error 429 * Retrying in 15 minutes * in getTweets: %s' %(e))
                    sys.stderr.flush()
                    time.sleep(60*15 + 5)
                    self.logger.info('error 429 * Retrying now * in getTweets: %s' %(e))
                    return 2
                else:
                    raise e # Caller must handle the rate limiting issue
            elif e.e.code in (500, 502, 503, 504):
                self.logger.info('Encountered %i Error. Retrying in %i seconds' % (e.e.code, wait_period))
                time.sleep(wait_period)
                wait_period *= 1.5
                return wait_period
            else:
                self.logger.error('Exit - aborting - %s' %(e))
        while True:
            try:
                self.searchTwitter( q, max_res=10)
            except twitter.api.TwitterHTTPError as e:
                error_count = 0
                wait_period = handleError(e, wait_period)
                if wait_period is None:
                    return  
        
        
        

### Make Search

In [99]:
t= TwitterAPI()

In [100]:
query = "RondaRousey"
tsearch = t.searchTwitter(query)

INFO:twt150530:info in searchTwitter - got 20 tweets - max: 10


In [101]:
#check if insert in os system
# Check file exist in the path specified
jsonFpath = '/home/juandavid/Documentos/archivosIpython/Chpater3/data'
jsonFname = 'twtr15053001'
jsonSuffix = 'json'
os.path.isfile('{0}/{1}.{2}'.format(jsonFpath, jsonFname, jsonSuffix))

True

In [102]:
# load tweets from file as a text string
twts_ld = IO_json(jsonFpath, jsonFname).load()
# Convert loaded tweets to Json
twts_js = json.loads(twts_ld)
#show firts tweet
pp(twts_js[1])

{u'contributors': None,
 u'coordinates': None,
 u'created_at': u'Fri Jan 01 14:59:57 +0000 2016',
 u'entities': {u'hashtags': [],
               u'media': [{u'display_url': u'pic.twitter.com/fl49qzWIYO',
                           u'expanded_url': u'http://twitter.com/Braddok2070/status/670606095178702848/photo/1',
                           u'id': 670606093622493188,
                           u'id_str': u'670606093622493188',
                           u'indices': [30, 53],
                           u'media_url': u'http://pbs.twimg.com/media/CU54sGqUwAQMF7Y.jpg',
                           u'media_url_https': u'https://pbs.twimg.com/media/CU54sGqUwAQMF7Y.jpg',
                           u'sizes': {u'large': {u'h': 375,
                                                 u'resize': u'fit',
                                                 u'w': 500},
                                      u'medium': {u'h': 375,
                                                  u'resize': u'fit',
         

In [37]:
# Extract key information from Tweets
twts_ls_no_url = extract_tweet_noURL(twts_js)
pp(twts_ls_no_url)

[{'created_at': '2016-01-01 11:34:29',
  'id': 682887613808775168,
  'tweet_text': 'RT @Talend: 2016 will be the year of #BigData &amp; #IoT! See how #Hadoop &amp; #ApacheSpark play in https://t.co/dKAi92BjRR @acole602 https://t.co\xe2\x80\xa6',
  'user_id': 598221206,
  'user_name': u'Omar Agha'},
 {'created_at': '2016-01-01 11:31:01',
  'id': 682886743159386112,
  'tweet_text': '"Yahoo! Benchmarks #ApacheFlink, #ApacheSpark &amp; #ApacheStorm" @infoQ https://t.co/CUqN0HL3BU #fastdata #bigdata #softwarearchitecture',
  'user_id': 4516811547,
  'user_name': u'Talks4Nerds'},
 {'created_at': '2016-01-01 10:50:51',
  'id': 682876632688885760,
  'tweet_text': 'RT @Talend: 2016 will be the year of #BigData &amp; #IoT! See how #Hadoop &amp; #ApacheSpark play in https://t.co/dKAi92BjRR @acole602 https://t.co\xe2\x80\xa6',
  'user_id': 3588748576,
  'user_name': u'Eccella Solutions'},
 {'created_at': '2016-01-01 10:14:58',
  'id': 682867605540073472,
  'tweet_text': '@noootsab @DataFellas @Spa

In [38]:
# Extract key information from Tweets
twts_ls_url = extract_tweet(twts_js)
pp(twts_ls_url)

[{'created_at': '2016-01-01 11:34:29',
  'id': 682887613808775168,
  'tweet_text': 'RT @Talend: 2016 will be the year of #BigData &amp; #IoT! See how #Hadoop &amp; #ApacheSpark play in https://t.co/dKAi92BjRR @acole602 https://t.co\xe2\x80\xa6',
  'url': u'http://bit.ly/1OIB1r9',
  'user_id': 598221206,
  'user_name': u'Omar Agha'},
 {'created_at': '2016-01-01 11:31:01',
  'id': 682886743159386112,
  'tweet_text': '"Yahoo! Benchmarks #ApacheFlink, #ApacheSpark &amp; #ApacheStorm" @infoQ https://t.co/CUqN0HL3BU #fastdata #bigdata #softwarearchitecture',
  'url': u'http://bit.ly/1R3E9UU',
  'user_id': 4516811547,
  'user_name': u'Talks4Nerds'},
 {'created_at': '2016-01-01 10:50:51',
  'id': 682876632688885760,
  'tweet_text': 'RT @Talend: 2016 will be the year of #BigData &amp; #IoT! See how #Hadoop &amp; #ApacheSpark play in https://t.co/dKAi92BjRR @acole602 https://t.co\xe2\x80\xa6',
  'url': u'http://bit.ly/1OIB1r9',
  'user_id': 3588748576,
  'user_name': u'Eccella Solutions'},
 {'cr

In [60]:
# Create list of Tweets NamedTuple (by passing list of tweets through function parseTweets
twts_nt_url =[parse_tweet(t) for t in twts_ls_url]
print(twts_nt_url[2])

Tweet01(id=682876632688885760, created_at='2016-01-01 10:50:51', user_id=3588748576, user_name=u'Eccella Solutions', tweet_text='RT @Talend: 2016 will be the year of #BigData &amp; #IoT! See how #Hadoop &amp; #ApacheSpark play in https://t.co/dKAi92BjRR @acole602 https://t.co\xe2\x80\xa6', url=u'http://bit.ly/1OIB1r9')


### read in csv

In [46]:
csvFpath = '/home/juandavid/Documentos/archivosIpython/Chpater3/data'
csvFname = 'twtr15051401'
csvSuffix = 'csv'

In [58]:
# Instantiate the CSV IO object
twts_csv = IO_csv(csvFpath, csvFname, csvSuffix)
# Tweet NamedTuple definitions to be passed to CSV as parameters
fields = ['id', 'created_at', 'user_id', 'user_name', 'tweet_text', 'url']
Tweet_NT = 'Tweet01'

In [59]:
# Executed the save twice - first in csv file create mode follwed by append mode
twts_csv.save(twts_nt_url, Tweet_NT, fields)
#lo salva solo una linea aunque saca error

UnicodeEncodeError: 'ascii' codec can't encode character u'\xf6' in position 1: ordinal not in range(128)

In [61]:
#read csv files
twts_csv_read = [t for t in twts_csv.load(Tweet_NT, fields)]

In [62]:
import numpy as np
import pandas as pd
from blaze import Data, by, join, merge
from odo import odo

In [63]:
#create pandas dataframe and show
twts_pd_df = pd.DataFrame(twts_csv_read, columns=Tweet01._fields)
twts_pd_df.head()

Unnamed: 0,id,created_at,user_id,user_name,tweet_text,url
0,id,created_at,user_id,user_name,tweet_text,url
1,682887613808775168,2016-01-01 11:34:29,598221206,Omar Agha,RT @Talend: 2016 will be the year of #BigData ...,http://bit.ly/1OIB1r9
2,682886743159386112,2016-01-01 11:31:01,4516811547,Talks4Nerds,"""Yahoo! Benchmarks #ApacheFlink, #ApacheSpark ...",http://bit.ly/1R3E9UU
3,682876632688885760,2016-01-01 10:50:51,3588748576,Eccella Solutions,RT @Talend: 2016 will be the year of #BigData ...,http://bit.ly/1OIB1r9


In [64]:
# remove first row as it is a duplicate with the header
twts_pd_df = twts_pd_df.drop(twts_pd_df.index[:1])
twts_pd_df.describe()

Unnamed: 0,id,created_at,user_id,user_name,tweet_text,url
count,3,3,3,3,3,3
unique,3,3,3,3,2,2
top,682876632688885760,2016-01-01 10:50:51,4516811547,Omar Agha,RT @Talend: 2016 will be the year of #BigData ...,http://bit.ly/1OIB1r9
freq,1,1,1,1,2,2


In [68]:
# Blaze dataframe
twts_bz_df = Data(twts_pd_df)
twts_bz_df

Unnamed: 0,id,created_at,user_id,user_name,tweet_text,url
1,682887613808775168,2016-01-01 11:34:29,598221206,Omar Agha,RT @Talend: 2016 will be the year of #BigData ...,http://bit.ly/1OIB1r9
2,682886743159386112,2016-01-01 11:31:01,4516811547,Talks4Nerds,"""Yahoo! Benchmarks #ApacheFlink, #ApacheSpark ...",http://bit.ly/1R3E9UU
3,682876632688885760,2016-01-01 10:50:51,3588748576,Eccella Solutions,RT @Talend: 2016 will be the year of #BigData ...,http://bit.ly/1OIB1r9


In [69]:
#show data
twts_bz_df.data

Unnamed: 0,id,created_at,user_id,user_name,tweet_text,url
1,682887613808775168,2016-01-01 11:34:29,598221206,Omar Agha,RT @Talend: 2016 will be the year of #BigData ...,http://bit.ly/1OIB1r9
2,682886743159386112,2016-01-01 11:31:01,4516811547,Talks4Nerds,"""Yahoo! Benchmarks #ApacheFlink, #ApacheSpark ...",http://bit.ly/1R3E9UU
3,682876632688885760,2016-01-01 10:50:51,3588748576,Eccella Solutions,RT @Talend: 2016 will be the year of #BigData ...,http://bit.ly/1OIB1r9


In [70]:
#extract fields
twts_bz_df[['id', 'user_name','tweet_text']].distinct()

Unnamed: 0,id,user_name,tweet_text
0,682887613808775168,Omar Agha,RT @Talend: 2016 will be the year of #BigData ...
1,682886743159386112,Talks4Nerds,"""Yahoo! Benchmarks #ApacheFlink, #ApacheSpark ..."
2,682876632688885760,Eccella Solutions,RT @Talend: 2016 will be the year of #BigData ...


### utilizar odo
    

In [73]:
#create a blaze dataframe from the csv file created
filepath   = csvFpath
filename   = csvFname
filesuffix = csvSuffix

twts_df = Data('{0}/{1}.{2}'.format(filepath, filename, filesuffix))

In [74]:
twts_df.head(1)

Unnamed: 0,id,created_at,user_id,user_name,tweet_text,url
0,682887613808775168,2016-01-01 11:34:29,598221206,Omar Agha,RT @Talend: 2016 will be the year of #BigData ...,http://bit.ly/1OIB1r9


In [75]:
#get distinct rows
twts_odo_distinct_df = twts_df[['id', 'user_name', 'user_id', 'tweet_text', 'created_at']].distinct()

#filepath to save json 
jsonFpath = '/home/juandavid/Documentos/archivosIpython/Chpater3/data'
jsonFname = 'twtr15051401_distinct' ## !! twtr15051401 reduced to only the distinct tweets
jsonSuffix = 'json'

#convert Blaze dataframe to json
#Odo(source, target)
odo(twts_odo_distinct_df, '{0}/{1}.{2}'.format(jsonFpath, jsonFname, jsonSuffix))

<odo.backends.json.JSONLines at 0x7f4ecbe6fbd0>

In [80]:
#load json created with odo
twts_odo_json_import = IO_json(jsonFpath, jsonFname).load()


In [81]:
twts_odo_json_import
#no sirve utilizar los metodos extracttweet porque este json 
#tiene menos columnas

u'{"user_id": 598221206, "created_at": "2016-01-01T11:34:29Z", "user_name": "Omar Agha", "id": 682887613808775168, "tweet_text": "RT @Talend: 2016 will be the year of #BigData &amp; #IoT! See how #Hadoop &amp; #ApacheSpark play in https://t.co/dKAi92BjRR @acole602 https://t.co\\u2026"}\n{"user_id": 4516811547, "created_at": "2016-01-01T11:31:01Z", "user_name": "Talks4Nerds", "id": 682886743159386112, "tweet_text": "\\"Yahoo! Benchmarks #ApacheFlink, #ApacheSpark &amp; #ApacheStorm\\" @infoQ https://t.co/CUqN0HL3BU #fastdata #bigdata #softwarearchitecture"}\n{"user_id": 3588748576, "created_at": "2016-01-01T10:50:51Z", "user_name": "Eccella Solutions", "id": 682876632688885760, "tweet_text": "RT @Talend: 2016 will be the year of #BigData &amp; #IoT! See how #Hadoop &amp; #ApacheSpark play in https://t.co/dKAi92BjRR @acole602 https://t.co\\u2026"}\n'

In [83]:
# de json to csv
csvFpath  = '/home/juandavid/Documentos/archivosIpython/Chpater3/data'
csvFname  = 'twtr15052401_all' ## !! twtr15051401 reduced to only the distinct tweets
csvSuffix = 'csv'

odo('{0}/{1}.{2}'.format(jsonFpath, jsonFname, jsonSuffix), '{0}/{1}.{2}'.format(csvFpath, csvFname, csvSuffix))

<odo.backends.csv.CSV at 0x7f4ecbdafc50>

In [84]:
#read csv created by odo
twts_csv_odo_df = Data('{0}/{1}.{2}'.format(csvFpath, csvFname, csvSuffix))

In [85]:
twts_csv_odo_df.head(1)

Unnamed: 0,created_at,id,tweet_text,user_id,user_name
0,2016-01-01 11:34:29,682887613808775168,RT @Talend: 2016 will be the year of #BigData ...,598221206,Omar Agha


### Spark Sql

In [87]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row

sqlc = SQLContext(sc)

In [91]:
twts_sql_df_01 = sqlc.jsonFile("/home/juandavid/Documentos/archivosIpython/Chpater3/data/twtr15051401_distinct.json")

In [92]:
twts_sql_df_01.show()

+--------------------+------------------+--------------------+----------+-----------------+
|          created_at|                id|          tweet_text|   user_id|        user_name|
+--------------------+------------------+--------------------+----------+-----------------+
|2016-01-01T11:34:29Z|682887613808775168|RT @Talend: 2016 ...| 598221206|        Omar Agha|
|2016-01-01T11:31:01Z|682886743159386112|"Yahoo! Benchmark...|4516811547|      Talks4Nerds|
|2016-01-01T10:50:51Z|682876632688885760|RT @Talend: 2016 ...|3588748576|Eccella Solutions|
+--------------------+------------------+--------------------+----------+-----------------+



In [93]:
twts_sql_df_01.select('user_name').show()

+-----------------+
|        user_name|
+-----------------+
|        Omar Agha|
|      Talks4Nerds|
|Eccella Solutions|
+-----------------+



In [95]:
#create sql statements
twts_sql_df_01.registerTempTable('tweets_01')
twts_sql_df_01_selection = sqlc.sql("SELECT * FROM tweets_01 WHERE user_name <> 'Omar Agha'")

twts_sql_df_01_selection.show()

+--------------------+------------------+--------------------+----------+-----------------+
|          created_at|                id|          tweet_text|   user_id|        user_name|
+--------------------+------------------+--------------------+----------+-----------------+
|2016-01-01T11:31:01Z|682886743159386112|"Yahoo! Benchmark...|4516811547|      Talks4Nerds|
|2016-01-01T10:50:51Z|682876632688885760|RT @Talend: 2016 ...|3588748576|Eccella Solutions|
+--------------------+------------------+--------------------+----------+-----------------+



In [103]:
#READ THE ORIGINAL json file 
jsonFpath = '/home/juandavid/Documentos/archivosIpython/Chpater3/data'
jsonFname = 'twtr15053001'
jsonSuffix = 'json'
infile = ('{0}/{1}.{2}'.format(jsonFpath, jsonFname, jsonSuffix))

twts_sc_json_01 = sc.textFile(infile).map(lambda x: json.loads(x))

In [105]:
#create with sql spark context
tweets_sqlc_inf = sqlc.jsonFile(infile)

In [109]:
tweets_sqlc_inf.columns

['contributors',
 'coordinates',
 'created_at',
 'entities',
 'favorite_count',
 'favorited',
 'geo',
 'id',
 'id_str',
 'in_reply_to_screen_name',
 'in_reply_to_status_id',
 'in_reply_to_status_id_str',
 'in_reply_to_user_id',
 'in_reply_to_user_id_str',
 'is_quote_status',
 'lang',
 'metadata',
 'place',
 'possibly_sensitive',
 'retweet_count',
 'retweeted',
 'retweeted_status',
 'source',
 'text',
 'truncated',
 'user']

In [111]:
#extract only the important twitter columns
tweets_extract_sqlc = tweets_sqlc_inf[['created_at', 'id_str', 'text', 'user.id', 'user.name', 'entities.urls.expanded_url']].distinct()


In [113]:
tweets_extract_sqlc.show()

+--------------------+------------------+--------------------+----------+-------------------+--------------------+
|          created_at|            id_str|                text|        id|               name|        expanded_url|
+--------------------+------------------+--------------------+----------+-------------------+--------------------+
|Fri Jan 01 14:40:...|682934358446059520|Lovely piece on @...|3072266542|    real girl sport|[https://sports.v...|
|Fri Jan 01 14:39:...|682934293002338304|@RondaRousey @Bur...|3385234989|John Wesley Hendren|                  []|
|Fri Jan 01 14:54:...|682937906814570496|RT @AnthonyMiyaza...|1244703216|          Bob Eager|[http://bit.ly/fi...|
|Fri Jan 01 14:59:...|682939332173287424|RT @Braddok2070: ...|3472261573|         Maryori115|                  []|
|Fri Jan 01 14:59:...|682939321024774144|RT @Braddok2070: ...|3472261573|         Maryori115|                  []|
|Fri Jan 01 14:39:...|682934284936691713|RT @ArmbarNation:...|3377744919|   Rond