In [1]:
""" Load data"""
import csv
from typing import List

questions_filename = 'responses.csv'
user_info_filename = 'user_info.csv'

def normalize_question(question: str) -> str:
    return question.replace('[Recommended]', '').strip()

def tokenize_questions(questions_str) -> List[str]:
    questions = questions_str.split(',')
    return list(map(normalize_question, questions))

def row_to_name(row: List[str]) -> str:
    return row[1]

def row_to_questions(row: List[str]) -> dict:
    qtype_idx = [('serious', 2), ('funny', 3)]
    return {qtype: tokenize_questions(row[idx])
            for qtype, idx in qtype_idx}

def load_questions_by_name(filename: str) -> dict:
    with open(filename, 'r') as f:
        reader = csv.reader(f, delimiter=',')
        next(reader) # skip header
        return {row_to_name(row): row_to_questions(row) for row in reader}

def load_emails_by_name(filename: str) -> dict:
    with open(filename, 'r') as f:
        reader = csv.reader(f, delimiter=',')
        next(reader) # skip header
        return {name: email for name, email, _ in reader}

def load_cousin_names(filename: str) -> set:
    with open(filename, 'r') as f:
        reader = csv.reader(f, delimiter=',')
        next(reader) # skip header
        return {name for name, _, is_cousin in reader if is_cousin == 'y'}
    
questions_by_name = load_questions_by_name(questions_filename)
emails_by_name = load_emails_by_name(user_info_filename)
cousin_names = load_cousin_names(user_info_filename)

# Verify that we have email for everyone.
for name in questions_by_name.keys():
    assert name in emails_by_name, f"No email for {name}"

In [2]:
"""Dump emails and questions."""

for name, questions_by_type in questions_by_name.items():
#     print(f"{name} ({emails_by_name[name]})")
    print(f"{name}")
    for qtype, questions in questions_by_type.items():
        print(f"    {qtype}:")
        for q in questions:
            print(f"        {q}")
                

Carol Ann
    serious:
        What's an average day in lockdown look like for you?
        What's something you'll never take for granted again?
    funny:
        Have you watched any good movies during the lockdown?
        What's the best/worst food you've had in the lockdown?
Rene
    serious:
        What's an average day in lockdown look like for you?
        What has been the silver lining of the lockdown?
    funny:
        What has been the hardest part of the lockdown?
        What's the one thing that has NOT changed in the lockdown?
Mary (Grandma)
    serious:
        What's an average day in lockdown look like for you?
        Have you watched any good movies during the lockdown?
        What's the first thing you'll do when things fully re-open?
        Have you learned any new skills while trapped at home?
    funny:
        What's an average day in lockdown look like for you?
        What's something you'll never take for granted again?
        What's the first thing y

In [3]:
"""Transfer questions by user into cuts."""
from collections import namedtuple
import random
import queue
from typing import NamedTuple

random.seed(1760)

# Definition of one person answering one question
class Cut(NamedTuple):
    name: str
    question: str
    qtype: str

# Flatten questions into cuts; separate by type and randomize.
def get_cuts_for_type(questions_by_name: dict, qtype: str):
    cuts = []
    for name, questions_by_type in questions_by_name.items():
        for q in questions_by_type[qtype]:
            cuts.append(Cut(name, q, qtype))
    print(f"Generated {len(cuts)} {qtype} cuts.")
    random.shuffle(cuts)  # Happens in place
    return cuts

cuts_by_type = {
    'serious': get_cuts_for_type(questions_by_name, 'serious'),
    'funny': get_cuts_for_type(questions_by_name, 'funny')
}

Generated 32 serious cuts.
Generated 30 funny cuts.


In [4]:
"""Splice cuts into batches/rounds."""
import math

# Choose how many "rounds" (chunk of serious + chunk of funny)
# to have -- based on whichever we have more of.
max_cuts = max([len(cuts) for cuts in cuts_by_type.values()]) 
num_rounds = int(math.ceil(max_cuts / 3.0))
print(f"Number of rounds: {num_rounds}")

# They probably didn't divide evenly; generate cuts per round,
# and shuffle as well.
def get_cuts_per_round(total_cuts: int, total_rounds: int):
    min_cuts_per_round = total_cuts // total_rounds
    max_cuts_per_round = min_cuts_per_round + 1
    num_max_rounds = total_cuts % total_rounds
    num_min_rounds = total_rounds - num_max_rounds
    cuts_per_round = [min_cuts_per_round] * num_min_rounds + \
                     [max_cuts_per_round] * num_max_rounds
    assert sum(cuts_per_round) == total_cuts
    assert len(cuts_per_round) == total_rounds
    random.shuffle(cuts_per_round)
    return cuts_per_round

cuts_per_round = {qtype: get_cuts_per_round(len(cuts), num_rounds)
                  for qtype, cuts in cuts_by_type.items()}

Number of rounds: 11


In [5]:
# Combine all cuts into one stream of prompts.
from itertools import accumulate

# Generator for returning batches of cuts based on round.
def cuts_for_round(cuts_per_round: List[int], cuts: List[Cut]):
    assert sum(cuts_per_round) == len(cuts)
    start_indices = accumulate([0] + cuts_per_round[:-1])
    end_indices = accumulate(cuts_per_round)
    for si, ei in zip(start_indices, end_indices):
        yield cuts[si:ei]

serious_cuts_batches = cuts_for_round(cuts_per_round['serious'],
                                      cuts_by_type['serious'])
funny_cuts_batches = cuts_for_round(cuts_per_round['funny'],
                                    cuts_by_type['funny'])
    
# Each person needs to know their own cut, and adjacent ones.
class Prompt(NamedTuple):
    prev_cut: Cut
    cut: Cut
    next_cut: Cut
    
# First just build prompts w/o next/prev. Easier to add after.
partial_prompts = []
cut_batches = zip(serious_cuts_batches, funny_cuts_batches)
for serious_cuts, funny_cuts in cut_batches:
    partial_prompts += [Prompt(prev_cut=None, cut=cut, next_cut=None)
                        for cut in serious_cuts]
    partial_prompts += [Prompt(prev_cut=None, cut=cut, next_cut=None)
                        for cut in funny_cuts]
    
# Connect prevs and nexts (doubly linked list??)
prompts = []
for i, prompt in enumerate(partial_prompts):
    prev_cut = partial_prompts[i - 1].cut if i > 0 else None
    cut = prompt.cut
    next_cut = partial_prompts[i + 1].cut \
                   if i < len(partial_prompts) - 1 else None
    prompts.append(Prompt(prev_cut, cut, next_cut))

print(f"Generated list of {len(prompts)} prompts")

Generated list of 62 prompts


In [6]:
# Post process/inspect.

# Print out prompts.
def prompt_to_str(prompt: Prompt):
    if prompt.prev_cut:
        prev_str = (f"<b>{prompt.prev_cut.name}</b> just answered a "
                    f"<b>{prompt.prev_cut.qtype}</b> question, "
                    "and is now asking you, ")
    else:
        prev_str = ("You're the first cut, so the narrator (Joe?) "
                    "will ask you, ")
    cur_str = (f"<i>{prompt.cut.question}</i> to which you'll give a "
               f"{prompt.cut.qtype} answer... ")
    if prompt.next_cut:
        next_str = (f"Then, you'll ask {prompt.next_cut.name}, "
                    f"<i>{prompt.next_cut.question}</i>")
    else:
        next_str = ("And you're the last one! Just make sure there "
                    "are at least 3 seconds of silence at the end for "
                    "us to fade to black. Thanks :)")
        
    return prev_str + cur_str + next_str
    
# Make sure things are wired correctly.
print("Checking for errors (asserts).")
for prompt, next_prompt in zip(prompts[:-1], prompts[1:]):
    assert prompt.next_cut.name == next_prompt.cut.name
    assert prompt.cut.name == next_prompt.prev_cut.name

# Check a few other things that are desirable.
print("Checking for warnings (prints)")
for i, prompt in enumerate(prompts):
    if prompt.next_cut and prompt.cut.name == prompt.next_cut.name:
        print(f"Warning: bak-to-back identical speaker {i}")    


Checking for errors (asserts).


In [7]:
# Print all prompts in order.

for i, p in enumerate(prompts):
    print(f"\nCut {i + 1} for {p.cut.name}:\n", prompt_to_str(p))


Cut 1 for John (Grandpa):
 You're the first cut, so the narrator (Joe?) will ask you, <i>What's the first thing you'll do when things fully re-open?</i> to which you'll give a serious answer... Then, you'll ask Gabi, <i>What's the most creative entertainment you've devised?</i>

Cut 2 for Gabi:
 <b>John (Grandpa)</b> just answered a <b>serious</b> question, and is now asking you, <i>What's the most creative entertainment you've devised?</i> to which you'll give a serious answer... Then, you'll ask Reagan, <i>What's an average day in lockdown look like for you?</i>

Cut 3 for Reagan:
 <b>Gabi</b> just answered a <b>serious</b> question, and is now asking you, <i>What's an average day in lockdown look like for you?</i> to which you'll give a serious answer... Then, you'll ask Bill, <i>What's the one thing that has NOT changed in the lockdown?</i>

Cut 4 for Bill:
 <b>Reagan</b> just answered a <b>serious</b> question, and is now asking you, <i>What's the one thing that has NOT changed i

In [8]:
# Generate the CSV needed to do a cool mail merge.
from collections import defaultdict
from jinja2 import Template
import os

output_dir = './emails'

# Combine prompts into a dict by user.
prompt_strs_by_name = defaultdict(list)
for i, p in enumerate(prompts):
    prompt_strs_by_name[p.cut.name].append(prompt_to_str(p))
    
# Figure out most prompts (and who).
# Actually don't need this anymore, but it's fun to see...
max_prompts, name = max((len(prompts), name)
                       for name,prompts in prompt_strs_by_name.items())
print(f"Most prompts was {max_prompts} ({name})")

# Open jinja template
with open('email.tmpl.html') as file:
    template = Template(file.read())

# Call template for one person.
def make_email_html(name, prompts):
    is_cousin = name in cousin_names
    return template.render(name=name,
                           prompts=prompts,
                           is_cousin=is_cousin)


def write_email_to_file(name, email_html):
    output_path = os.path.join(output_dir, f"{name}.html")
    with open(output_path, 'w') as f:
        f.write(email_html)
    print(f"Wrote {output_path}")

Most prompts was 7 (Mary (Grandma))


In [9]:
from __future__ import print_function
import pickle
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request

# If modifying these scopes, delete the file token.pickle.
SCOPES = ['https://www.googleapis.com/auth/gmail.compose']

def create_service():
    """Get access to gmail API."""
    creds = None
    # The file token.pickle stores the user's access and refresh tokens, and is
    # created automatically when the authorization flow completes for the first
    # time.
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    # If there are no (valid) credentials available, let the user log in.
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                '/Users/joe/Downloads/credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        # Save the credentials for the next run
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)

    return build('gmail', 'v1', credentials=creds)


service = create_service()

In [10]:
from email.mime.text import MIMEText
import sys
import base64

def create_message(email_address, email_body_html):
    """Create a message for an email."""
    message = MIMEText(email_body_html, 'html')
    message['to'] = email_address
    message['from'] = "joe.j.polin@gmail.com"
    message['subject'] = "Lockdown Life: Lights, Camera, Action!"
    return {'raw': base64.urlsafe_b64encode(message.as_bytes()).decode()}


def create_draft(service, message_body):
    try:
        message = {'message': message_body}
        draft = service.users().drafts().create(userId="me", body=message).execute()
        print("Draft created.")
    except:
        print('An error occurred: %s' % sys.exc_info()[0])
        print(sys.exc_info())


In [11]:
for name, prompts in prompt_strs_by_name.items():
    # Optional: Whitelist person
#     if name != "Paige":
#         continue
    email_html = make_email_html(name, prompts)
    # Create draft in gmail.
    if True:
        raw_msg = create_message(emails_by_name[name], email_html)
        create_draft(service, raw_msg)        
    # Write to file.
    if True:
        write_email_to_file(name, email_html)
    

Draft created.
Wrote ./emails/John (Grandpa).html
Draft created.
Wrote ./emails/Gabi.html
Draft created.
Wrote ./emails/Reagan.html
Draft created.
Wrote ./emails/Bill.html
Draft created.
Wrote ./emails/Malinda.html
Draft created.
Wrote ./emails/Adrian.html
Draft created.
Wrote ./emails/Mary Beth.html
Draft created.
Wrote ./emails/Paige.html
Draft created.
Wrote ./emails/Mary (Grandma).html
Draft created.
Wrote ./emails/Carol Ann.html
Draft created.
Wrote ./emails/Joe.html
Draft created.
Wrote ./emails/Ron.html
Draft created.
Wrote ./emails/Rene.html
Draft created.
Wrote ./emails/David.html
