# Read data from Google Cloud Storage to MonglDB (Atlas)

In [1]:
import re
import pymongo
import pyspark
from google.cloud import storage
from pyspark.sql import Row, SparkSession
import warnings 
warnings.filterwarnings(action = 'ignore') 

## Connect to MongoDB

#### Local MongoDB configuration

In [2]:
local_ip_address = 'localhost'
local_database_name = 'your_database_name'
local_collection_name = 'your_collection_name'
local_client = pymongo.MongoClient("localhost", 27017)
local_db = local_client[local_database_name]
local_collection=local_db[local_collection_name]

#### MongoDB Atlas configuration

In [3]:
mongo_username = 'your_user_name'
mongo_password =  'your_password'
mongo_ip_address = 'your_id_address.mongodb.net/?retryWrites=true&w=majority'
database_name = 'your_database_name'
collection_name = 'your_collection_name'
client = pymongo.MongoClient(f"mongodb+srv://{mongo_username}:{mongo_password}@{mongo_ip_address}")
db = client[database_name]
collection=db[collection_name]

## Connect to Google Cloud Storage

In [4]:
bucket_name = 'your_bucket_name'
service_account_key_file = 'your_service_account_key.json'
jar_path='gcs-connector-hadoop2-latest.jar'

conf = pyspark.SparkConf().set("spark.jars", jar_path)
sc = pyspark.SparkContext(conf=conf).getOrCreate()
conf = sc._jsc.hadoopConfiguration()

conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("google.cloud.auth.service.account.json.keyfile", service_account_key_file)
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

spark = SparkSession.builder.getOrCreate()

23/03/22 23:00:58 WARN Utils: Your hostname, Xinnnnns-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.107 instead (on interface en0)
23/03/22 23:00:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/03/22 23:00:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/22 23:01:00 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/03/22 23:01:00 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/03/22 23:01:00 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


## Retrieve the song list and tweets from GCS

If you wish to save data locally, use the 'local_collection' to pull it, but if you want to aggregate data to Atlas, use 'collection' instead. Here I am saving data to Atlas with `collection`.

#### Define functions

In [5]:
def get_new_song_df(spark, bucket_name, mongodb,service_account_key_file):
    df = spark.read.format("csv")\
              .option("header", True)\
              .load("gs://"+bucket_name+'/whole.csv')
    aggregate_new_songs(mongodb, df)
    get_tweet_df(mongodb, spark, bucket_name, service_account_key_file)
    

def aggregate_new_songs(collection, df):
    for row in df.collect():
        dict_row=row.asDict()
        if list(collection.find({'_id':dict_row['id']}))==[]:
            dict_row['_id'] = dict_row['id']
            del dict_row['id']
            collection.insert_one(dict_row)
            

def get_tweet_df(collection, spark, bucket_name, service_account_key_file):
    storage_client = storage.Client.from_service_account_json(service_account_key_file)
    blobs = storage_client.list_blobs(bucket_name)
    song_list = [blob.name for blob in blobs if (blob.name.startswith('2023')) 
                 and (blob.name.endswith('csv'))]
    
    for s in song_list:
        # Using this line to control the reading process 
        if s[:10] == '2023-03-21': # and s[11].lower() > 'c':
            df = spark.read.format("csv").option("header", True)\
                    .option('multiLine',True)\
                    .load("gs://" + bucket_name + '/' + repr(s).strip('"').strip("'"))  
            song_name = s.replace('.csv','')
            print(song_name)
            song_name = re.sub(r'^[0-9]+-[0-9]+-[0-9]+'+'/', '', song_name)
            for row in df.collect():
                dict_row=row.asDict()
                push_tweets("one", collection, song_name, dict_row)
            print('done')

            
def push_tweets(type_, collection, song_name, one_content_dict):
    if type_ == 'one':
        tweet_id=one_content_dict['tweet_id']
        if list(collection.find({'tweets.tweet_id':tweet_id}))==[]:
            collection.update_one({'name':song_name},
                                  {'$push':{'tweets':one_content_dict}})
    else:
        collection.update_one({'name':song_name},
                              {'$push':{'tweets':one_content_dict}})

#### Read csv file (the song list)

In [6]:
df = spark.read.format("csv").option("header", True).load("gs://"+bucket_name+'/whole.csv')
aggregate_new_songs(collection, df)

                                                                                

In [7]:
collection.count_documents({})

1798

#### Read folders (tweets for each song)

In [8]:
get_tweet_df(collection, spark, bucket_name, service_account_key_file)

2023-03-21/Abadi
done
2023-03-21/BUBBLY
done
2023-03-21/Crying On The Dancefloor
done
2023-03-21/Did you dream too
done
2023-03-21/Enough
done
2023-03-21/Gelosa (feat. Shiva, Sfera Ebbasta, Guè)
done
2023-03-21/ICU
done
2023-03-21/ORIGAMI ALL'ALBA - CLARA
done
2023-03-21/Players - DJ Smallz 732 - Jersey Club Remix
done
2023-03-21/Strangest Sea
done


                                                                                

2023-03-21/The April Skies
done
2023-03-21/To Be Yours (feat. Claud)
done
2023-03-21/Treat People With Kindness


                                                                                

done
2023-03-21/Wrapped Around Your Finger
done


#### Change wired song names

In [9]:
# local_collection.update_one({'name':"NO voy a llorar :')" },
#                             {'$set':{'name':'NO voy a llorar'}})

## Move data from altas to local MongoDB

In [10]:
# all_doc = collection.find({})
# local_collection.insert_many(list(all_doc))

In [11]:
# local_collection.count_documents({})