In [1]:
import pandas as pd
import glob
import networkx as nx
import matplotlib.pyplot as plt
from itertools import combinations, islice
import csv
from datetime import datetime, timedelta
import time

# parallelization
import multiprocessing as mp
import threading
import queue
global lck 
lck = threading.Lock()
import tqdm


In [9]:
G = nx.read_gpickle("YahooMusic.gpickle")

In [2]:
# from neuralogic.nn import get_evaluator
# from neuralogic.core import Backend
# from neuralogic.core import Relation, R, Template, Var, V, Term
# from neuralogic.core.settings import Settings, Optimizer
# from neuralogic.utils.data import Dataset

### EDA

In [3]:
# data source: https://www.kaggle.com/c/ee627a-2019fall/data?select=albumData2.txt
# origin of data: https://webscope.sandbox.yahoo.com/catalog.php?datatype=c&guccounter=1&guce_referrer=aHR0cHM6Ly9naXRodWIuY29tL3NhcmFueWF2c3IvTXVzaWMtUmVjb21tZW5kYXRpb25z&guce_referrer_sig=AQAAADdDVj1NcJ7l9D0AF1OwjrIchcuyq2aDD8kc4qxRk3RP-B1mQTaY0IDliV2wsC-gQw05v-d9k8v70efaNULAbemXR_upER5MDVS8mcDsU_DQJZmtcUF8Sdh7A1holj3I-8UJVcKbI65keJp44o46CL8aGp2kLYhRCUYeTXkwxv9N


data_list = glob.glob('ee627a-2019fall\*')
data_list

[]

- trainItem2.txt - the training set 
- testItem2.txt - the test set 
- sample_ submission.csv - a sample submission file in the correct format 
- trackData2.txt -- Track information formatted as: <'TrackId'>|<'AlbumId'>|<'ArtistId'>|<'Optional GenreId_1'>|...|<'Optional GenreId_k'> 
- albumData2.txt -- Album information formatted as: <'AlbumId'>|<'ArtistId'>|<'Optional GenreId_1'>|...|<'Optional GenreId_k'> 
- artistData2.txt -- Artist listing formatted as: <'ArtistId'>
- genreData2.txt -- Genre listing formatted as: <'GenreId'>



In [2]:
# create empty graph
G = nx.Graph()

# process tracks data file
for row in open('ee627a-2019fall/trackData2.txt'):
    row = row.strip('\n').split('|')
    G.add_node(row[0], attr = {"node_type": "track"}) # add track node
    G.add_edge(row[0], row[1], weight=100) # connect track to album
    G.add_node(row[1], attr = {"node_type": "album"}) # add album node
    G.add_node(row[2], attr = {"node_type": "artist"}) # add artist node
    if len(row) > 3:
        for genre in row[3:]:
            G.add_node(genre, attr = {"node_type": "genre"}) # add genre node
            G.add_edge(row[0], genre, weight=100) # connect each genre to the track

In [3]:
# process albums data file
for row in open('ee627a-2019fall/albumData2.txt'):
    row = row.strip('\n').split('|')
    G.add_edge(row[0], row[1], weight=100) # connect album to artist
    if len(row) > 2:
        for genre in row[2:]:
            G.add_node(genre, attr = {"node_type": "genre"}) # add genre node
            G.add_edge(row[0], genre, weight=100) # connect each genre to the album

In [4]:
# process training user data file
for row in open('ee627a-2019fall/trainItem2.txt'):
    if '|' in row:
        cur_user = row.strip('\n').split('|')[0] # pull user ID. don't need song count
        G.add_node(cur_user, attr = {"node_type": "user"}) # add user node
        continue # skip to the user's ratings
    row = row.strip('\n').split('\t')
    G.add_edge(cur_user, row[0], weight=int(row[1])) # connect user to song with rating as edge weight

In [5]:
print(len(G.nodes))
print(len(G.edges)) # 956820
print(len([1 for cc in nx.connected_components(G)]))


296101
13342506
5


In [6]:
nx.write_gpickle(G, "YahooMusic.gpickle")


### G2 selects the largest subgraph. appears to just drop 4 disconnected nodes
not currently needed because switched to astar, which is ok with weakly connected graphs  
don't want to process if not necessary because the 2nd graph takes about 1.5GB of memory

### Processing attempt 1. estimated time to completion of 20 days

### Processing attempt 2. parallelized - ETC 6 days

In [17]:
df_1strun = pd.read_csv('recommendations-Copy1.csv')
df_1strun['TrackID'] = df_1strun['TrackID'].str.split("_", n = 1, expand = True)

completed_users = df_1strun.TrackID.unique()

In [7]:
def write_user(package, outfile='./recommendations.csv'):  
    with open(outfile, 'a',encoding='utf-8-sig', newline='\n') as g:
        keys = package[0].keys() # get keys off first rating
        dict_writer = csv.DictWriter(g, keys)
        dict_writer.writerows(package)


def process_user(i, infile='ee627a-2019fall/testItem2.txt'):
    with open(infile, "r") as f:
        lines_gen = islice(f, i*7, (i+1)*7) # get line index for userID & their last target song
        cur_set = [x.strip('\n').split('|') for x in lines_gen]
    test_user = cur_set[0][0] # pull user ID. don't need song count  199810_208019
    if test_user in completed_users:
        return None
    distances = []
    for song in cur_set[1:]:
        dist=nx.astar_path_length(G, str(test_user), \
                                     str(song[0]), \
                                     weight='weight') # distance from user to target song
        distances.append((f'{test_user}_{song[0]}', dist))
    distances.sort(key=lambda x:x[1])
    predictions = []
    for i, j in enumerate(distances):
        cur_dict = {}
        cur_dict['TrackID'] = j[0]
        if i < 3:
            cur_dict['Predictor'] = 1 # sorted ascending, smaller distance means closer & should be recommended
        else:
            cur_dict['Predictor'] = 0
        predictions.append(cur_dict)

    return predictions

class Worker(threading.Thread):

    def __init__(self, q, i, *args, **kwargs):
        self.q = q
        self.i = i
        super().__init__(*args, **kwargs)

            
    def run(self):
        while True:
            try:
                j = self.q.get(timeout=3)  # 3s timeout
            except queue.Empty:
                return
            

            predictions_processed = process_user(j)
            if predictions_processed: # none will be returned if user already completed so we can skip
                write_user(predictions_processed)

            self.q.task_done()


def mt_user_predictions(infile, outfile):
    print('start time: {}'.format(datetime.now().strftime("%Y-%m-%d-%H.%M.%S")))
    start_time = time.time()

    # n_threads= int(args[0])
    n_threads = 30
    ################### add desired output columns
    with open(outfile ,'w') as oufl, open(infile, 'r', encoding='utf-8') as infl:
        oufl.write('TrackID,Predictor')
        row_count = sum(1 for row in infl)
    print('total rows:', row_count) 
    
    q = queue.Queue()
    
    n_users = int((row_count)/7)
    with open(infile) as f:
        for i in range(n_users):
            q.put_nowait(i)
    print(f'[{n_users}] users in target file. passing indices to user multithreading')
    for _ in range(n_threads):
        Worker(q, _).start()
        time.sleep(1)
    q.join()
    
    print('finished. end time: {}'.format(datetime.now().strftime("%Y-%m-%d-%H.%M.%S")))
    print('completed in {}'.format(timedelta(seconds=int(time.time() - start_time))))

    ### multithreading doesn't help. this is PCU heavy. dumby....
# mt_user_predictions(infile='ee627a-2019fall/testItem2.txt', outfile='./recommendations.csv')            

##### switch to multiprocessing

In [8]:
infile='ee627a-2019fall/testItem2.txt'
outfile='./recommendations.csv'

################### add desired output columns
with open(outfile ,'w') as oufl, open(infile, 'r', encoding='utf-8') as infl:
    oufl.write('TrackID,Predictor\n')
    row_count = sum(1 for row in infl)
print('total rows:', row_count) 
n_users = int((row_count)/7)
tasks = list(range(n_users))

pbar = tqdm.tqdm(total=len(tasks))

def run_mp(j):            
    predictions_processed = process_user(j)
    write_user(predictions_processed)
    pbar.update(1)
 
print('start time: {}'.format(datetime.now().strftime("%Y-%m-%d-%H.%M.%S")))
start_time = time.time()


num_workers = mp.cpu_count()
print('num workers avail:', num_workers)

pool = mp.Pool()
pool.imap_unordered(run_mp, tasks)



  0%|          | 0/20000 [00:00<?, ?it/s]

total rows: 140000
start time: 2022-03-15-18.12.22
num workers avail: 16


<multiprocessing.pool.IMapUnorderedIterator at 0x7fadc84fd450>

In [None]:
""" try 1  
pool = mp.Pool(num_workers)
for _ in tqdm.tqdm(pool.map(func=run_mp, iterable=tasks, chunksize=20), total=len(tasks)):
    pass

result = pool.map(func=run_mp, iterable=tasks, chunksize=2)
"""

""" try 2
# pool = mp.Pool()
# pbar = tqdm.tqdm(total=len(tasks))

# pool.imap_unordered(run_mp, tasks)
"""

"""try 3
num_workers = mp.cpu_count()
print('num workers avail:', num_workers)
pbar = tqdm.tqdm(total=len(tasks))

with mp.Pool(num_workers) as p:
    for i, r in enumerate(p.imap_unordered(run_mp, tasks)):

        pbar.update(i)

pbar.close()
"""
#### at 12:27, 14hr eta means done before 3pm
#### at 10:31, 1532125it [10:08:20, 146.29it/s], 10,500 lines written after 10 hrs. ~1000/hr = 140hrs

### Docker setup
https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/install-guide.html#docker

start image:
sudo docker run --rm --gpus all nvidia/cuda:11.0-base nvidia-smi
alternate
sudo nvidia-docker run -it -v ~/gits/nw-msds458/final:/wkg nvidia/cuda:11-base nvidia-smi

------
sudo docker run \
    --rm \
    -it \
    --gpus all \
    -v ~/gits/nw-msds458/final:/wkg \
    -e EXTRA_APT_PACKAGES="vim nano" \
    -e EXTRA_CONDA_PACKAGES="jq" \
    -e EXTRA_PIP_PACKAGES="networkx" \
    -p 5555:8888 \
    -p 8787:8787 \
    -p 8786:8786 \
    rapidsai/rapidsai:22.02-cuda11.0-runtime-ubuntu18.04-py3.8


In [None]:
#####################################################################################################

## Graveyard  
other distance measure attemps. even slower  
also post processing that isn't necessary if i write to file

In [6]:
listy = [('asdf', 3), ('sdfg', 2), ('dfgh', 80), ('fghj', 1), ('ghjk', 34), ('hjkl', 23)]
listy.sort(key=lambda x:x[1])
print(listy)
predicitons = []
for i, j in enumerate(listy):
    cur_dict = {}
#     cur_dict['TrackID'] = f'{test_user}_{song}'
    cur_dict['TrackID'] = f'{j[0]}'
    if i < 3:
        cur_dict['Predictor'] = 1 # sorted ascending, smaller distance means closer & should be recommended
    else:
        cur_dict['Predictor'] = 0
    predicitons.append(cur_dict)
print(predicitons)

[('fghj', 1), ('sdfg', 2), ('asdf', 3), ('hjkl', 23), ('ghjk', 34), ('dfgh', 80)]
[{'TrackID': 'fghj', 'Predictor': 0}, {'TrackID': 'sdfg', 'Predictor': 0}, {'TrackID': 'asdf', 'Predictor': 0}, {'TrackID': 'hjkl', 'Predictor': 1}, {'TrackID': 'ghjk', 'Predictor': 1}, {'TrackID': 'dfgh', 'Predictor': 1}]


In [9]:
# # connect all unconnected components with 0 weight so graph is strongly connected
# combs = list(combinations(list(G.nodes), 2))
# for comb in combs:
#     if not G.has_edge(comb[0], comb[1]):
#         G.add_edge(comb[0], comb[1], weight=0)

### memory error

In [10]:
# too big
# subax1 = plt.subplot(121)
# nx.draw(G, with_labels=True)

In [11]:
%time nx.shortest_path(G2, '1', '214765')

CPU times: user 28 µs, sys: 1 µs, total: 29 µs
Wall time: 31 µs


['1', '214765']

In [12]:
try:
    %time length=nx.astar_path_length(G, '1', '214765')
except:
    print('switching to subgraph')
    %time length=nx.astar_path_length(G2, '1', '214765')

# print(path['1']['214765'])
print(length)


CPU times: user 48 µs, sys: 1 µs, total: 49 µs
Wall time: 51 µs
100


In [None]:
%time nx.resistance_distance(G2, '1', '214765')

In [None]:
try:
    %time length2=nx.dijkstra_path_length(G, '1', '214765')
except:
    print('switching to subgraph')
    %time length2=nx.dijkstra_path_length(G2, '1', '214765')

# print(path['1']['214765'])
print(length2)


In [None]:
# %time nx.dijkstra_path(G2, '1', '214765', weight='weight')

In [None]:
# try:
#     %time path=nx.all_pairs_dijkstra_path(G)
# except:
#     print('switching to subgraph')
#     %time path=nx.all_pairs_dijkstra_path(G2)

# print(path['1']['214765'])


In [None]:
# # is this cell necessary? process df in next for predicting 1/0
# with f as open('ee627a-2019fall\\testItem2.txt'):



In [None]:
# submit to: https://www.kaggle.com/c/ee627a-2019fall/submit
df_submission = pd.read_csv('ee627a-2019fall\\sample_submission.csv')
df_submission[['user', 'track']] = df_submission['TrackID'].str.split('_', 1, expand=True)

df_submission