In [1]:
import requests
import os
import json
import pandas as pd
import csv

In [2]:
def set_token(path: str):
    """
    Set os env variable 'TOKEN' as bearer_token. 
    Param 'path': path to .json file containing developer account bearer_token value. 
    Pkg: os, json
    """
    if path:
        creds = json.load(open(path))
        os.environ['TOKEN'] = creds['bearer_token']
        
    return print('Set TOKEN enviroment varible.')



def create_header():
    """
    Create header dictionary to pass into API requests. 
    Helpers: set_token()
    Pkgs: os 
    """
    if os.environ['TOKEN']:
        header = {
            "Content-type": "application/json",
            "Authorization": "Bearer {}".format(os.environ['TOKEN'])
        }
        
    return header



def add_rules(rule: str):
    """
    Add query rule to rule set stored with twitter API.
    Param 'rule': rule to add to API with POST.
    Helpers: create_header()
    Pkgs: requests, json
    """
    stream_url = 'https://api.twitter.com/2/tweets/search/stream/rules'
    header = create_header()
    body_params = {"add":[{"value": rule}]}
    
    r = requests.post(
            url = stream_url,
            headers = header,
            json = body_params,
            timeout = 0.5
        )
    
    return r.json()



def list_rules(scope=None):
    """
    List rules stored with twitter API. 
    Param 'scope': 
        None = Default, returns list of rule 'ids' and 'values'
        ids = Returns lsit of rule 'ids' only. 
        values = Returns list of rule 'values' only. 
    Helpers: create_header()
    Pkgs: requests, json
    """
    if scope not in [None,'ids','values']:
        return Exception("Scope seetting not valid. [Default=None, 'ids', 'value']")
    
    stream_url = 'https://api.twitter.com/2/tweets/search/stream/rules'
    header = create_header()
    r = requests.get(url = stream_url, headers = header, timeout = 0.5)
    
    if 'data' in r.json():
        output = r.json()['data']
        temp_list = []
        n_rules = len(output)
   
        if scope == None:
            return output

        elif scope == 'ids':
            for i in range(0, n_rules):
                temp_list.append(output[i]['id'])
            return temp_list

        else:
            for i in range(0, n_rules):
                temp_list.append(output[i]['value'])
            return temp_list
    else:
        return print('Rules list is empty.')
   

        
def delete_rules(ids: list):
    """
    Delete list of rules with twitter API.
    Param 'ids': list of rule ids from twitter API.
    Helpers: create_header()
    Pkgs: requests, json
    """
    stream_url = 'https://api.twitter.com/2/tweets/search/stream/rules'
    header = create_header()
    body_params = {"delete":{"ids": ids}}
    r = requests.post(url = stream_url, headers = header, json = body_params, timeout = 1)
    
    return r.json()

In [317]:
#PYTHON TESTS BELOW

In [320]:
path = '/Users/ryanrogala/Code/creds/tw_dev_001/twcreds.json'
set_token(path)
rule = "broncos OR @broncos"
add_rules(rule)
list_rules()
delete_rules(list_rules('ids'))
list_rules()

Set TOKEN enviroment varible.
Rules list is empty.
